Sync reorg up to history size (#3874)

* Allow sync reorg up to pruning history size

* Peer difficulty tracking

* Abort downloading block if received with NewBlock

* Set pruning history to 1200

* Renamed history size field
This commit is contained in:
Arkadiy Paronyan 2016-12-23 18:43:40 +01:00 committed by Gav Wood
parent 4516f893e5
commit 5a3c3bcb45
11 changed files with 181 additions and 70 deletions

View File

@ -1205,7 +1205,7 @@ impl BlockChainClient for Client {
} }
fn import_block(&self, bytes: Bytes) -> Result<H256, BlockImportError> { fn import_block(&self, bytes: Bytes) -> Result<H256, BlockImportError> {
use verification::queue::kind::HasHash; use verification::queue::kind::BlockLike;
use verification::queue::kind::blocks::Unverified; use verification::queue::kind::blocks::Unverified;
// create unverified block here so the `sha3` calculation can be cached. // create unverified block here so the `sha3` calculation can be cached.
@ -1245,7 +1245,9 @@ impl BlockChainClient for Client {
} }
fn chain_info(&self) -> BlockChainInfo { fn chain_info(&self) -> BlockChainInfo {
self.chain.read().chain_info() let mut chain_info = self.chain.read().chain_info();
chain_info.pending_total_difficulty = chain_info.total_difficulty + self.block_queue.total_difficulty();
chain_info
} }
fn additional_params(&self) -> BTreeMap<String, String> { fn additional_params(&self) -> BTreeMap<String, String> {
@ -1369,6 +1371,7 @@ impl BlockChainClient for Client {
PruningInfo { PruningInfo {
earliest_chain: self.chain.read().first_block_number().unwrap_or(1), earliest_chain: self.chain.read().first_block_number().unwrap_or(1),
earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0), earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0),
state_history_size: Some(self.history),
} }
} }

View File

@ -92,6 +92,8 @@ pub struct TestBlockChainClient {
pub first_block: RwLock<Option<(H256, u64)>>, pub first_block: RwLock<Option<(H256, u64)>>,
/// Traces to return /// Traces to return
pub traces: RwLock<Option<Vec<LocalizedTrace>>>, pub traces: RwLock<Option<Vec<LocalizedTrace>>>,
/// Pruning history size to report.
pub history: RwLock<Option<u64>>,
} }
/// Used for generating test client blocks. /// Used for generating test client blocks.
@ -154,6 +156,7 @@ impl TestBlockChainClient {
ancient_block: RwLock::new(None), ancient_block: RwLock::new(None),
first_block: RwLock::new(None), first_block: RwLock::new(None),
traces: RwLock::new(None), traces: RwLock::new(None),
history: RwLock::new(None),
}; };
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().clone(); client.genesis_hash = client.last_hash.read().clone();
@ -314,6 +317,11 @@ impl TestBlockChainClient {
let res = res.into_iter().next().unwrap().expect("Successful import"); let res = res.into_iter().next().unwrap().expect("Successful import");
assert_eq!(res, TransactionImportResult::Current); assert_eq!(res, TransactionImportResult::Current);
} }
/// Set reported history size.
pub fn set_history(&self, h: Option<u64>) {
*self.history.write() = h;
}
} }
pub fn get_temp_state_db() -> GuardedTempResult<StateDB> { pub fn get_temp_state_db() -> GuardedTempResult<StateDB> {
@ -704,6 +712,7 @@ impl BlockChainClient for TestBlockChainClient {
PruningInfo { PruningInfo {
earliest_chain: 1, earliest_chain: 1,
earliest_state: 1, earliest_state: 1,
state_history_size: *self.history.read(),
} }
} }

View File

@ -28,4 +28,6 @@ pub struct PruningInfo {
pub earliest_chain: u64, pub earliest_chain: u64,
/// The first block where state requests may be served. /// The first block where state requests may be served.
pub earliest_state: u64, pub earliest_state: u64,
/// State pruning history size.
pub state_history_size: Option<u64>,
} }

View File

@ -19,18 +19,21 @@
use engines::Engine; use engines::Engine;
use error::Error; use error::Error;
use util::{HeapSizeOf, H256}; use util::{HeapSizeOf, H256, U256};
pub use self::blocks::Blocks; pub use self::blocks::Blocks;
pub use self::headers::Headers; pub use self::headers::Headers;
/// Something which can produce a hash and a parent hash. /// Something which can produce a hash and a parent hash.
pub trait HasHash { pub trait BlockLike {
/// Get the hash of this item. /// Get the hash of this item.
fn hash(&self) -> H256; fn hash(&self) -> H256;
/// Get the hash of this item's parent. /// Get the hash of this item's parent.
fn parent_hash(&self) -> H256; fn parent_hash(&self) -> H256;
/// Get the difficulty of this item.
fn difficulty(&self) -> U256;
} }
/// Defines transitions between stages of verification. /// Defines transitions between stages of verification.
@ -45,13 +48,13 @@ pub trait HasHash {
/// consistent. /// consistent.
pub trait Kind: 'static + Sized + Send + Sync { pub trait Kind: 'static + Sized + Send + Sync {
/// The first stage: completely unverified. /// The first stage: completely unverified.
type Input: Sized + Send + HasHash + HeapSizeOf; type Input: Sized + Send + BlockLike + HeapSizeOf;
/// The second stage: partially verified. /// The second stage: partially verified.
type Unverified: Sized + Send + HasHash + HeapSizeOf; type Unverified: Sized + Send + BlockLike + HeapSizeOf;
/// The third stage: completely verified. /// The third stage: completely verified.
type Verified: Sized + Send + HasHash + HeapSizeOf; type Verified: Sized + Send + BlockLike + HeapSizeOf;
/// Attempt to create the `Unverified` item from the input. /// Attempt to create the `Unverified` item from the input.
fn create(input: Self::Input, engine: &Engine) -> Result<Self::Unverified, Error>; fn create(input: Self::Input, engine: &Engine) -> Result<Self::Unverified, Error>;
@ -62,14 +65,14 @@ pub trait Kind: 'static + Sized + Send + Sync {
/// The blocks verification module. /// The blocks verification module.
pub mod blocks { pub mod blocks {
use super::{Kind, HasHash}; use super::{Kind, BlockLike};
use engines::Engine; use engines::Engine;
use error::Error; use error::Error;
use header::Header; use header::Header;
use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered}; use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered};
use util::{Bytes, HeapSizeOf, H256}; use util::{Bytes, HeapSizeOf, H256, U256};
/// A mode for verifying blocks. /// A mode for verifying blocks.
pub struct Blocks; pub struct Blocks;
@ -126,7 +129,7 @@ pub mod blocks {
} }
} }
impl HasHash for Unverified { impl BlockLike for Unverified {
fn hash(&self) -> H256 { fn hash(&self) -> H256 {
self.header.hash() self.header.hash()
} }
@ -134,9 +137,13 @@ pub mod blocks {
fn parent_hash(&self) -> H256 { fn parent_hash(&self) -> H256 {
self.header.parent_hash().clone() self.header.parent_hash().clone()
} }
fn difficulty(&self) -> U256 {
self.header.difficulty().clone()
}
} }
impl HasHash for PreverifiedBlock { impl BlockLike for PreverifiedBlock {
fn hash(&self) -> H256 { fn hash(&self) -> H256 {
self.header.hash() self.header.hash()
} }
@ -144,12 +151,16 @@ pub mod blocks {
fn parent_hash(&self) -> H256 { fn parent_hash(&self) -> H256 {
self.header.parent_hash().clone() self.header.parent_hash().clone()
} }
fn difficulty(&self) -> U256 {
self.header.difficulty().clone()
}
} }
} }
/// Verification for headers. /// Verification for headers.
pub mod headers { pub mod headers {
use super::{Kind, HasHash}; use super::{Kind, BlockLike};
use engines::Engine; use engines::Engine;
use error::Error; use error::Error;
@ -157,10 +168,12 @@ pub mod headers {
use verification::verify_header_params; use verification::verify_header_params;
use util::hash::H256; use util::hash::H256;
use util::U256;
impl HasHash for Header { impl BlockLike for Header {
fn hash(&self) -> H256 { self.hash() } fn hash(&self) -> H256 { self.hash() }
fn parent_hash(&self) -> H256 { self.parent_hash().clone() } fn parent_hash(&self) -> H256 { self.parent_hash().clone() }
fn difficulty(&self) -> U256 { self.difficulty().clone() }
} }
/// A mode for verifying headers. /// A mode for verifying headers.

View File

@ -26,7 +26,7 @@ use error::*;
use engines::Engine; use engines::Engine;
use service::*; use service::*;
use self::kind::{HasHash, Kind}; use self::kind::{BlockLike, Kind};
pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo; pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo;
@ -132,13 +132,14 @@ pub struct VerificationQueue<K: Kind> {
deleting: Arc<AtomicBool>, deleting: Arc<AtomicBool>,
ready_signal: Arc<QueueSignal>, ready_signal: Arc<QueueSignal>,
empty: Arc<SCondvar>, empty: Arc<SCondvar>,
processing: RwLock<HashSet<H256>>, processing: RwLock<HashMap<H256, U256>>, // hash to difficulty
ticks_since_adjustment: AtomicUsize, ticks_since_adjustment: AtomicUsize,
max_queue_size: usize, max_queue_size: usize,
max_mem_use: usize, max_mem_use: usize,
scale_verifiers: bool, scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>, verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>, state: Arc<(Mutex<State>, Condvar)>,
total_difficulty: RwLock<U256>,
} }
struct QueueSignal { struct QueueSignal {
@ -269,7 +270,7 @@ impl<K: Kind> VerificationQueue<K> {
more_to_verify: more_to_verify, more_to_verify: more_to_verify,
verification: verification, verification: verification,
deleting: deleting, deleting: deleting,
processing: RwLock::new(HashSet::new()), processing: RwLock::new(HashMap::new()),
empty: empty, empty: empty,
ticks_since_adjustment: AtomicUsize::new(0), ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
@ -277,6 +278,7 @@ impl<K: Kind> VerificationQueue<K> {
scale_verifiers: scale_verifiers, scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles, verifier_handles: verifier_handles,
state: state, state: state,
total_difficulty: RwLock::new(0.into()),
} }
} }
@ -434,6 +436,7 @@ impl<K: Kind> VerificationQueue<K> {
sizes.unverified.store(0, AtomicOrdering::Release); sizes.unverified.store(0, AtomicOrdering::Release);
sizes.verifying.store(0, AtomicOrdering::Release); sizes.verifying.store(0, AtomicOrdering::Release);
sizes.verified.store(0, AtomicOrdering::Release); sizes.verified.store(0, AtomicOrdering::Release);
*self.total_difficulty.write() = 0.into();
self.processing.write().clear(); self.processing.write().clear();
} }
@ -448,7 +451,7 @@ impl<K: Kind> VerificationQueue<K> {
/// Check if the item is currently in the queue /// Check if the item is currently in the queue
pub fn status(&self, hash: &H256) -> Status { pub fn status(&self, hash: &H256) -> Status {
if self.processing.read().contains(hash) { if self.processing.read().contains_key(hash) {
return Status::Queued; return Status::Queued;
} }
if self.verification.bad.lock().contains(hash) { if self.verification.bad.lock().contains(hash) {
@ -461,7 +464,7 @@ impl<K: Kind> VerificationQueue<K> {
pub fn import(&self, input: K::Input) -> ImportResult { pub fn import(&self, input: K::Input) -> ImportResult {
let h = input.hash(); let h = input.hash();
{ {
if self.processing.read().contains(&h) { if self.processing.read().contains_key(&h) {
return Err(ImportError::AlreadyQueued.into()); return Err(ImportError::AlreadyQueued.into());
} }
@ -480,7 +483,11 @@ impl<K: Kind> VerificationQueue<K> {
Ok(item) => { Ok(item) => {
self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst); self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst);
self.processing.write().insert(h.clone()); self.processing.write().insert(h.clone(), item.difficulty());
{
let mut td = self.total_difficulty.write();
*td = *td + item.difficulty();
}
self.verification.unverified.lock().push_back(item); self.verification.unverified.lock().push_back(item);
self.more_to_verify.notify_all(); self.more_to_verify.notify_all();
Ok(h) Ok(h)
@ -511,7 +518,10 @@ impl<K: Kind> VerificationQueue<K> {
bad.reserve(hashes.len()); bad.reserve(hashes.len());
for hash in hashes { for hash in hashes {
bad.insert(hash.clone()); bad.insert(hash.clone());
processing.remove(hash); if let Some(difficulty) = processing.remove(hash) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
} }
let mut new_verified = VecDeque::new(); let mut new_verified = VecDeque::new();
@ -520,7 +530,10 @@ impl<K: Kind> VerificationQueue<K> {
if bad.contains(&output.parent_hash()) { if bad.contains(&output.parent_hash()) {
removed_size += output.heap_size_of_children(); removed_size += output.heap_size_of_children();
bad.insert(output.hash()); bad.insert(output.hash());
processing.remove(&output.hash()); if let Some(difficulty) = processing.remove(&output.hash()) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
} else { } else {
new_verified.push_back(output); new_verified.push_back(output);
} }
@ -538,7 +551,10 @@ impl<K: Kind> VerificationQueue<K> {
} }
let mut processing = self.processing.write(); let mut processing = self.processing.write();
for hash in hashes { for hash in hashes {
processing.remove(hash); if let Some(difficulty) = processing.remove(hash) {
let mut td = self.total_difficulty.write();
*td = *td - difficulty;
}
} }
processing.is_empty() processing.is_empty()
} }
@ -592,6 +608,11 @@ impl<K: Kind> VerificationQueue<K> {
} }
} }
/// Get the total difficulty of all the blocks in the queue.
pub fn total_difficulty(&self) -> U256 {
self.total_difficulty.read().clone()
}
/// Get the current number of working verifiers. /// Get the current number of working verifiers.
pub fn num_verifiers(&self) -> usize { pub fn num_verifiers(&self) -> usize {
match *self.state.0.lock() { match *self.state.0.lock() {
@ -760,6 +781,22 @@ mod tests {
} }
} }
#[test]
fn returns_total_difficulty() {
let queue = get_test_queue(false);
let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) {
panic!("error importing block that is valid by definition({:?})", e);
}
queue.flush();
assert_eq!(queue.total_difficulty(), 131072.into());
queue.drain(10);
assert_eq!(queue.total_difficulty(), 131072.into());
queue.mark_as_good(&[ hash ]);
assert_eq!(queue.total_difficulty(), 0.into());
}
#[test] #[test]
fn returns_ok_for_drained_duplicates() { fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue(false); let queue = get_test_queue(false);

View File

@ -93,7 +93,7 @@ notify_work = ["http://localhost:3001"]
[footprint] [footprint]
tracing = "auto" tracing = "auto"
pruning = "auto" pruning = "auto"
pruning_history = 64 pruning_history = 1200
cache_size_db = 64 cache_size_db = 64
cache_size_blocks = 8 cache_size_blocks = 8
cache_size_queue = 50 cache_size_queue = 50

View File

@ -237,7 +237,7 @@ usage! {
or |c: &Config| otry!(c.footprint).tracing.clone(), or |c: &Config| otry!(c.footprint).tracing.clone(),
flag_pruning: String = "auto", flag_pruning: String = "auto",
or |c: &Config| otry!(c.footprint).pruning.clone(), or |c: &Config| otry!(c.footprint).pruning.clone(),
flag_pruning_history: u64 = 64u64, flag_pruning_history: u64 = 1200u64,
or |c: &Config| otry!(c.footprint).pruning_history.clone(), or |c: &Config| otry!(c.footprint).pruning_history.clone(),
flag_cache_size_db: u32 = 64u32, flag_cache_size_db: u32 = 64u32,
or |c: &Config| otry!(c.footprint).cache_size_db.clone(), or |c: &Config| otry!(c.footprint).cache_size_db.clone(),
@ -629,7 +629,7 @@ mod tests {
// -- Footprint Options // -- Footprint Options
flag_tracing: "auto".into(), flag_tracing: "auto".into(),
flag_pruning: "auto".into(), flag_pruning: "auto".into(),
flag_pruning_history: 64u64, flag_pruning_history: 1200u64,
flag_cache_size_db: 64u32, flag_cache_size_db: 64u32,
flag_cache_size_blocks: 8u32, flag_cache_size_blocks: 8u32,
flag_cache_size_queue: 50u32, flag_cache_size_queue: 50u32,

View File

@ -898,7 +898,7 @@ mod tests {
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
format: Default::default(), format: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64, pruning_history: 1200,
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
tracing: Default::default(), tracing: Default::default(),
@ -920,7 +920,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64, pruning_history: 1200,
format: Default::default(), format: Default::default(),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -942,7 +942,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
file_path: Some("state.json".into()), file_path: Some("state.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64, pruning_history: 1200,
format: Default::default(), format: Default::default(),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -966,7 +966,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
file_path: Some("blockchain.json".into()), file_path: Some("blockchain.json".into()),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64, pruning_history: 1200,
format: Some(DataFormat::Hex), format: Some(DataFormat::Hex),
compaction: Default::default(), compaction: Default::default(),
wal: true, wal: true,
@ -1001,7 +1001,7 @@ mod tests {
dirs: Default::default(), dirs: Default::default(),
spec: Default::default(), spec: Default::default(),
pruning: Default::default(), pruning: Default::default(),
pruning_history: 64, pruning_history: 1200,
daemon: None, daemon: None,
logger_config: Default::default(), logger_config: Default::default(),
miner_options: Default::default(), miner_options: Default::default(),

View File

@ -32,9 +32,8 @@ const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 64; const MAX_BODIES_TO_REQUEST: usize = 64;
const MAX_RECEPITS_TO_REQUEST: usize = 128; const MAX_RECEPITS_TO_REQUEST: usize = 128;
const SUBCHAIN_SIZE: u64 = 256; const SUBCHAIN_SIZE: u64 = 256;
const MAX_ROUND_PARENTS: usize = 32; const MAX_ROUND_PARENTS: usize = 16;
const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5;
const MAX_REORG_BLOCKS: u64 = 20;
#[derive(Copy, Clone, Eq, PartialEq, Debug)] #[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Downloader state /// Downloader state
@ -95,27 +94,38 @@ pub struct BlockDownloader {
last_imported_hash: H256, last_imported_hash: H256,
/// Number of blocks imported this round /// Number of blocks imported this round
imported_this_round: Option<usize>, imported_this_round: Option<usize>,
/// Block number the last round started with.
last_round_start: BlockNumber,
last_round_start_hash: H256,
/// Block parents imported this round (hash, parent) /// Block parents imported this round (hash, parent)
round_parents: VecDeque<(H256, H256)>, round_parents: VecDeque<(H256, H256)>,
/// Do we need to download block recetips. /// Do we need to download block recetips.
download_receipts: bool, download_receipts: bool,
/// Sync up to the block with this hash. /// Sync up to the block with this hash.
target_hash: Option<H256>, target_hash: Option<H256>,
/// Reorganize up to this many blocks. Up to genesis if `None`,
max_reorg_blocks: Option<BlockNumber>,
/// Probing range for seeking common best block.
retract_step: u64,
} }
impl BlockDownloader { impl BlockDownloader {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy.
pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader { pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option<BlockNumber>) -> BlockDownloader {
BlockDownloader { BlockDownloader {
state: State::Idle, state: State::Idle,
highest_block: None, highest_block: None,
last_imported_block: start_number, last_imported_block: start_number,
last_imported_hash: start_hash.clone(), last_imported_hash: start_hash.clone(),
last_round_start: start_number,
last_round_start_hash: start_hash.clone(),
blocks: BlockCollection::new(sync_receipts), blocks: BlockCollection::new(sync_receipts),
imported_this_round: None, imported_this_round: None,
round_parents: VecDeque::new(), round_parents: VecDeque::new(),
download_receipts: sync_receipts, download_receipts: sync_receipts,
target_hash: None, target_hash: None,
max_reorg_blocks: max_reorg,
retract_step: 1,
} }
} }
@ -127,9 +137,12 @@ impl BlockDownloader {
/// Mark a block as known in the chain /// Mark a block as known in the chain
pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) { pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) {
if number == self.last_imported_block + 1 { if number >= self.last_imported_block + 1 {
self.last_imported_block = number; self.last_imported_block = number;
self.last_imported_hash = hash.clone(); self.last_imported_hash = hash.clone();
self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + 1);
self.last_round_start = number;
self.last_round_start_hash = hash.clone();
} }
} }
@ -148,12 +161,6 @@ impl BlockDownloader {
self.target_hash = Some(hash.clone()); self.target_hash = Some(hash.clone());
} }
/// Set starting sync block
pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) {
self.last_imported_hash = hash.clone();
self.last_imported_block = number;
}
/// Unmark header as being downloaded. /// Unmark header as being downloaded.
pub fn clear_header_download(&mut self, hash: &H256) { pub fn clear_header_download(&mut self, hash: &H256) {
self.blocks.clear_header_download(hash) self.blocks.clear_header_download(hash)
@ -172,6 +179,7 @@ impl BlockDownloader {
pub fn reset_to(&mut self, hashes: Vec<H256>) { pub fn reset_to(&mut self, hashes: Vec<H256>) {
self.reset(); self.reset();
self.blocks.reset_to(hashes); self.blocks.reset_to(hashes);
self.state = State::Blocks;
} }
/// Returns used heap memory size. /// Returns used heap memory size.
@ -260,7 +268,7 @@ impl BlockDownloader {
return Ok(DownloadAction::Reset); return Ok(DownloadAction::Reset);
} else { } else {
let best = io.chain().chain_info().best_block_number; let best = io.chain().chain_info().best_block_number;
if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) {
trace!(target: "sync", "No common block, disabling peer"); trace!(target: "sync", "No common block, disabling peer");
return Err(BlockDownloaderImportError::Invalid); return Err(BlockDownloaderImportError::Invalid);
} }
@ -336,39 +344,47 @@ impl BlockDownloader {
fn start_sync_round(&mut self, io: &mut SyncIo) { fn start_sync_round(&mut self, io: &mut SyncIo) {
self.state = State::ChainHead; self.state = State::ChainHead;
trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block);
// Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even
// from the non-canonical part of the tree. So we also retract if nothing has been imported last round. // from the non-canonical part of the tree. So we also retract if nothing has been imported last round.
let start = self.last_round_start;
let start_hash = self.last_round_start_hash;
match self.imported_this_round { match self.imported_this_round {
Some(n) if n == 0 && self.last_imported_block > 0 => { Some(n) if n == 0 && start > 0 => {
// nothing was imported last round, step back to a previous block // nothing was imported last round, step back to a previous block
// search parent in last round known parents first // search parent in last round known parents first
if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) {
self.last_imported_block -= 1; self.last_imported_block = start - 1;
self.last_imported_hash = p.clone(); self.last_imported_hash = p.clone();
trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash);
} else { } else {
let best = io.chain().chain_info().best_block_number; let best = io.chain().chain_info().best_block_number;
if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) {
debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", self.last_imported_block, self.last_imported_hash); debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash);
self.reset(); self.reset();
} else { } else {
match io.chain().block_hash(BlockId::Number(self.last_imported_block - 1)) { let n = start - min(self.retract_step, start);
self.retract_step *= 2;
match io.chain().block_hash(BlockId::Number(n)) {
Some(h) => { Some(h) => {
self.last_imported_block -= 1; self.last_imported_block = n;
self.last_imported_hash = h; self.last_imported_hash = h;
trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash);
} }
None => { None => {
debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash);
self.reset(); self.reset();
} }
} }
} }
} }
}, },
_ => (), _ => {
self.retract_step = 1;
},
} }
self.last_round_start = self.last_imported_block;
self.last_round_start_hash = self.last_imported_hash;
self.imported_this_round = None; self.imported_this_round = None;
} }
@ -474,6 +490,9 @@ impl BlockDownloader {
self.block_imported(&h, number, &parent); self.block_imported(&h, number, &parent);
}, },
Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => { Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => {
break;
},
Err(BlockImportError::Block(BlockError::UnknownParent(_))) => {
trace!(target: "sync", "Unknown new block parent, restarting sync"); trace!(target: "sync", "Unknown new block parent, restarting sync");
break; break;
}, },

View File

@ -372,6 +372,7 @@ impl ChainSync {
/// Create a new instance of syncing strategy. /// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain_info = chain.chain_info(); let chain_info = chain.chain_info();
let pruning = chain.pruning_info();
let mut sync = ChainSync { let mut sync = ChainSync {
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
starting_block: chain.chain_info().best_block_number, starting_block: chain.chain_info().best_block_number,
@ -379,7 +380,7 @@ impl ChainSync {
peers: HashMap::new(), peers: HashMap::new(),
handshaking_peers: HashMap::new(), handshaking_peers: HashMap::new(),
active_peers: HashSet::new(), active_peers: HashSet::new(),
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.state_history_size),
old_blocks: None, old_blocks: None,
last_sent_block_number: 0, last_sent_block_number: 0,
network_id: config.network_id, network_id: config.network_id,
@ -459,6 +460,7 @@ impl ChainSync {
fn reset(&mut self, io: &mut SyncIo) { fn reset(&mut self, io: &mut SyncIo) {
self.new_blocks.reset(); self.new_blocks.reset();
self.snapshot.clear(); self.snapshot.clear();
let chain_info = io.chain().chain_info();
if self.state == SyncState::SnapshotData { if self.state == SyncState::SnapshotData {
debug!(target:"sync", "Aborting snapshot restore"); debug!(target:"sync", "Aborting snapshot restore");
io.snapshot_service().abort_restore(); io.snapshot_service().abort_restore();
@ -466,6 +468,10 @@ impl ChainSync {
for (_, ref mut p) in &mut self.peers { for (_, ref mut p) in &mut self.peers {
if p.block_set != Some(BlockSet::OldBlocks) { if p.block_set != Some(BlockSet::OldBlocks) {
p.reset_asking(); p.reset_asking();
if p.difficulty.is_none() {
// assume peer has up to date difficulty
p.difficulty = Some(chain_info.pending_total_difficulty);
}
} }
} }
self.state = SyncState::Idle; self.state = SyncState::Idle;
@ -557,14 +563,15 @@ impl ChainSync {
/// Update sync after the blockchain has been changed externally. /// Update sync after the blockchain has been changed externally.
pub fn update_targets(&mut self, chain: &BlockChainClient) { pub fn update_targets(&mut self, chain: &BlockChainClient) {
// Do not assume that the block queue/chain still has our last_imported_block // Do not assume that the block queue/chain still has our last_imported_block
let pruning = chain.pruning_info();
let chain = chain.chain_info(); let chain = chain.chain_info();
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.state_history_size);
self.old_blocks = None; self.old_blocks = None;
if self.download_old_blocks { if self.download_old_blocks {
if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) {
trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number);
let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number); let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, pruning.state_history_size);
if let Some(hash) = chain.first_block_hash { if let Some(hash) = chain.first_block_hash {
trace!(target: "sync", "Downloader target set to {:?}", hash); trace!(target: "sync", "Downloader target set to {:?}", hash);
downloader.set_target(&hash); downloader.set_target(&hash);
@ -860,6 +867,12 @@ impl ChainSync {
trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id); trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id);
return Ok(()); return Ok(());
} }
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
peer.difficulty = Some(difficulty);
}
}
let block_rlp = try!(r.at(0)); let block_rlp = try!(r.at(0));
let header_rlp = try!(block_rlp.at(0)); let header_rlp = try!(block_rlp.at(0));
let h = header_rlp.as_raw().sha3(); let h = header_rlp.as_raw().sha3();
@ -888,6 +901,8 @@ impl ChainSync {
trace!(target: "sync", "New block already queued {:?}", h); trace!(target: "sync", "New block already queued {:?}", h);
}, },
Ok(_) => { Ok(_) => {
// abort current download of the same block
self.complete_sync(io);
self.new_blocks.mark_as_known(&header.hash(), header.number()); self.new_blocks.mark_as_known(&header.hash(), header.number());
trace!(target: "sync", "New block queued {:?} ({})", h, header.number()); trace!(target: "sync", "New block queued {:?} ({})", h, header.number());
}, },
@ -906,16 +921,10 @@ impl ChainSync {
} else { } else {
trace!(target: "sync", "New unknown block {:?}", h); trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks //TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1));
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
if peer.difficulty.map_or(true, |pd| difficulty > pd) {
peer.difficulty = Some(difficulty);
trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h);
}
}
self.sync_peer(io, peer_id, true); self.sync_peer(io, peer_id, true);
} }
} }
self.continue_sync(io);
Ok(()) Ok(())
} }
@ -925,16 +934,24 @@ impl ChainSync {
trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id); trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id);
return Ok(()); return Ok(());
} }
let hashes: Vec<_> = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1))).collect();
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
// Peer has new blocks with unknown difficulty
peer.difficulty = None;
if let Some(&(Ok(ref h), _)) = hashes.last() {
peer.latest_hash = h.clone();
}
}
if self.state != SyncState::Idle { if self.state != SyncState::Idle {
trace!(target: "sync", "Ignoring new hashes since we're already downloading."); trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::<BlockNumber>(1).unwrap_or(0)).fold(0u64, max); let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::<BlockNumber>(1).unwrap_or(0)).fold(0u64, max);
if max > self.highest_block.unwrap_or(0) { if max > self.highest_block.unwrap_or(0) {
self.highest_block = Some(max); self.highest_block = Some(max);
} }
self.continue_sync(io);
return Ok(()); return Ok(());
} }
trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let mut max_height: BlockNumber = 0; let mut max_height: BlockNumber = 0;
let mut new_hashes = Vec::new(); let mut new_hashes = Vec::new();
let last_imported_number = self.new_blocks.last_imported_block_number(); let last_imported_number = self.new_blocks.last_imported_block_number();
@ -982,6 +999,7 @@ impl ChainSync {
self.state = SyncState::NewBlocks; self.state = SyncState::NewBlocks;
self.sync_peer(io, peer_id, true); self.sync_peer(io, peer_id, true);
} }
self.continue_sync(io);
Ok(()) Ok(())
} }
@ -1106,7 +1124,7 @@ impl ChainSync {
thread_rng().shuffle(&mut peers); //TODO: sort by rating thread_rng().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version // prefer peers with higher protocol version
peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2));
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len());
for (p, _, _) in peers { for (p, _, _) in peers {
if self.active_peers.contains(&p) { if self.active_peers.contains(&p) {
self.sync_peer(io, p, false); self.sync_peer(io, p, false);
@ -1135,12 +1153,13 @@ impl ChainSync {
/// Find something to do for a peer. Called for a new peer or when a peer is done with its task. /// Find something to do for a peer. Called for a new peer or when a peer is done with its task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
if !self.active_peers.contains(&peer_id) { if !self.active_peers.contains(&peer_id) {
trace!(target: "sync", "Skipping deactivated peer"); trace!(target: "sync", "Skipping deactivated peer {}", peer_id);
return; return;
} }
let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = { let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = {
if let Some(peer) = self.peers.get_mut(&peer_id) { if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.asking != PeerAsking::Nothing || !peer.can_sync() { if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
trace!(target: "sync", "Skipping busy peer {}", peer_id);
return; return;
} }
if self.state == SyncState::Waiting { if self.state == SyncState::Waiting {
@ -1161,7 +1180,7 @@ impl ChainSync {
let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(); let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count();
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { if force || higher_difficulty || self.old_blocks.is_some() {
match self.state { match self.state {
SyncState::WaitingPeers => { SyncState::WaitingPeers => {
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
@ -1174,9 +1193,10 @@ impl ChainSync {
} }
let have_latest = io.chain().block_status(BlockId::Hash(peer_latest)) != BlockStatus::Unknown; let have_latest = io.chain().block_status(BlockId::Hash(peer_latest)) != BlockStatus::Unknown;
trace!(target: "sync", "Considering peer {}, force={}, td={:?}, our td={}, latest={}, have_latest={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, peer_latest, have_latest, self.state);
if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) { if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) {
// check if got new blocks to download // check if got new blocks to download
trace!(target: "sync", "Syncing with {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) { if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) {
self.request_blocks(io, peer_id, request, BlockSet::NewBlocks); self.request_blocks(io, peer_id, request, BlockSet::NewBlocks);
if self.state == SyncState::Idle { if self.state == SyncState::Idle {
@ -1206,6 +1226,8 @@ impl ChainSync {
SyncState::SnapshotManifest | //already downloading from other peer SyncState::SnapshotManifest | //already downloading from other peer
SyncState::Waiting | SyncState::SnapshotWaiting => () SyncState::Waiting | SyncState::SnapshotWaiting => ()
} }
} else {
trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
} }
} }
@ -2035,7 +2057,9 @@ impl ChainSync {
/// called when block is imported to chain - propagates the blocks and updates transactions sent to peers /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) { pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) {
if io.is_chain_queue_empty() { let queue_info = io.chain().queue_info();
if !self.status().is_syncing(queue_info) || !sealed.is_empty() {
trace!(target: "sync", "Propagating blocks, state={:?}", self.state);
self.propagate_latest_blocks(io, sealed); self.propagate_latest_blocks(io, sealed);
self.propagate_proposed_blocks(io, proposed); self.propagate_proposed_blocks(io, proposed);
} }

View File

@ -255,8 +255,12 @@ fn high_td_attach() {
fn disconnect_on_unrelated_chain() { fn disconnect_on_unrelated_chain() {
::env_logger::init().ok(); ::env_logger::init().ok();
let mut net = TestNet::new(2); let mut net = TestNet::new(2);
net.peer(0).chain.add_blocks(200, EachBlockWith::Uncle); net.peer(0).chain.set_history(Some(20));
net.peer(1).chain.add_blocks(100, EachBlockWith::Nothing); net.peer(1).chain.set_history(Some(20));
net.restart_peer(0);
net.restart_peer(1);
net.peer(0).chain.add_blocks(500, EachBlockWith::Uncle);
net.peer(1).chain.add_blocks(300, EachBlockWith::Nothing);
net.sync(); net.sync();
assert_eq!(net.disconnect_events, vec![(0, 0)]); assert_eq!(net.disconnect_events, vec![(0, 0)]);
} }