From 84021a6148b1463e46aa6a2a2dca1578910bd29b Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Tue, 27 Dec 2016 15:12:03 +0100 Subject: [PATCH] Backporting to beta (#3980) * v1.4.7 * Allow sync reorg up to pruning history size * Peer difficulty tracking * Abort downloading block if received with NewBlock * Fixed test Former-commit-id: f2058bdd2feefe3651134aa07e6ce9e3e041fbec --- Cargo.lock | 28 +++++------ Cargo.toml | 2 +- ethcore/src/client/client.rs | 13 ++++- ethcore/src/client/test_client.rs | 15 ++++++ ethcore/src/client/traits.rs | 4 ++ ethcore/src/types/mod.rs.in | 3 +- ethcore/src/types/pruning_info.rs | 29 +++++++++++ ethcore/src/verification/queue/kind.rs | 35 ++++++++----- ethcore/src/verification/queue/mod.rs | 58 ++++++++++++++++++---- mac/Parity.pkgproj | 2 +- nsis/installer.nsi | 2 +- sync/src/block_sync.rs | 63 +++++++++++++++--------- sync/src/chain.rs | 68 +++++++++++++++++++------- sync/src/tests/chain.rs | 8 ++- util/Cargo.toml | 2 +- 15 files changed, 247 insertions(+), 85 deletions(-) create mode 100644 ethcore/src/types/pruning_info.rs diff --git a/Cargo.lock b/Cargo.lock index 90112d209..0127cab22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ [root] name = "parity" -version = "1.4.6" +version = "1.4.7" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.96 (registry+https://github.com/rust-lang/crates.io-index)", @@ -21,7 +21,7 @@ dependencies = [ "ethcore-rpc 1.4.0", "ethcore-signer 1.4.0", "ethcore-stratum 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "ethsync 1.4.0", "fdlimit 0.1.0", "hyper 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -289,7 +289,7 @@ dependencies = [ "ethcore-ipc 1.4.0", "ethcore-ipc-codegen 1.4.0", "ethcore-ipc-nano 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "ethjson 0.1.0", "ethkey 0.2.0", "ethstore 0.1.0", @@ -336,7 +336,7 @@ dependencies = [ "ethabi 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.4.0", "ethcore-rpc 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "fetch 0.1.0", "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "jsonrpc-core 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -381,7 +381,7 @@ name = "ethcore-ipc" version = "1.4.0" dependencies = [ "ethcore-devtools 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -428,7 +428,7 @@ dependencies = [ "ethcore-ipc 1.4.0", "ethcore-ipc-codegen 1.4.0", "ethcore-ipc-nano 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "nanomsg 0.5.1 (git+https://github.com/ethcore/nanomsg.rs.git)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -439,7 +439,7 @@ name = "ethcore-logger" version = "1.4.0" dependencies = [ "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "isatty 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -455,7 +455,7 @@ dependencies = [ "bytes 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-devtools 1.4.0", "ethcore-io 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "ethcrypto 0.1.0", "ethkey 0.2.0", "igd 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -482,7 +482,7 @@ dependencies = [ "ethcore-devtools 1.4.0", "ethcore-io 1.4.0", "ethcore-ipc 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "ethcrypto 0.1.0", "ethjson 0.1.0", "ethkey 0.2.0", @@ -511,7 +511,7 @@ dependencies = [ "ethcore-devtools 1.4.0", "ethcore-io 1.4.0", "ethcore-rpc 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "jsonrpc-core 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-dapps-glue 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -530,7 +530,7 @@ dependencies = [ "ethcore-ipc 1.4.0", "ethcore-ipc-codegen 1.4.0", "ethcore-ipc-nano 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "json-tcp-server 0.1.0 (git+https://github.com/ethcore/json-tcp-server)", "jsonrpc-core 3.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -541,7 +541,7 @@ dependencies = [ [[package]] name = "ethcore-util" -version = "1.4.6" +version = "1.4.7" dependencies = [ "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", @@ -590,7 +590,7 @@ dependencies = [ name = "ethjson" version = "0.1.0" dependencies = [ - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde_codegen 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -643,7 +643,7 @@ dependencies = [ "ethcore-ipc-codegen 1.4.0", "ethcore-ipc-nano 1.4.0", "ethcore-network 1.4.0", - "ethcore-util 1.4.6", + "ethcore-util 1.4.7", "heapsize 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 4b11484de..4d31b99b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Ethcore client." name = "parity" -version = "1.4.6" +version = "1.4.7" license = "GPL-3.0" authors = ["Ethcore "] build = "build.rs" diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ec59e01cf..b2aaa64e5 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -46,6 +46,7 @@ use transaction::{LocalizedTransaction, SignedTransaction, Action}; use blockchain::extras::TransactionAddress; use types::filter::Filter; use types::mode::Mode as IpcMode; +use types::pruning_info::PruningInfo; use log_entry::LocalizedLogEntry; use verification::queue::BlockQueue; use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute}; @@ -1081,7 +1082,7 @@ impl BlockChainClient for Client { } fn import_block(&self, bytes: Bytes) -> Result { - use verification::queue::kind::HasHash; + use verification::queue::kind::BlockLike; use verification::queue::kind::blocks::Unverified; // create unverified block here so the `sha3` calculation can be cached. @@ -1121,7 +1122,9 @@ impl BlockChainClient for Client { } fn chain_info(&self) -> BlockChainInfo { - self.chain.read().chain_info() + let mut chain_info = self.chain.read().chain_info(); + chain_info.pending_total_difficulty = chain_info.total_difficulty + self.block_queue.total_difficulty(); + chain_info } fn additional_params(&self) -> BTreeMap { @@ -1226,6 +1229,12 @@ impl BlockChainClient for Client { self.uncle(id) .map(|header| self.engine.extra_info(&decode(&header))) } + + fn pruning_info(&self) -> PruningInfo { + PruningInfo { + history_size: Some(self.history), + } + } } impl MiningBlockChainClient for Client { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index ea220fb1c..3b72a96b3 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -38,6 +38,7 @@ use evm::{Factory as EvmFactory, VMType, Schedule}; use miner::{Miner, MinerService, TransactionImportResult}; use spec::Spec; use types::mode::Mode; +use types::pruning_info::PruningInfo; use views::BlockView; use verification::queue::QueueInfo; @@ -89,6 +90,8 @@ pub struct TestBlockChainClient { pub ancient_block: RwLock>, /// First block info. pub first_block: RwLock>, + /// Pruning history size to report. + pub history: RwLock>, } #[derive(Clone)] @@ -140,6 +143,7 @@ impl TestBlockChainClient { latest_block_timestamp: RwLock::new(10_000_000), ancient_block: RwLock::new(None), first_block: RwLock::new(None), + history: RwLock::new(None), }; client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().clone(); @@ -300,6 +304,11 @@ impl TestBlockChainClient { let res = res.into_iter().next().unwrap().expect("Successful import"); assert_eq!(res, TransactionImportResult::Current); } + + /// Set reported history size. + pub fn set_history(&self, h: Option) { + *self.history.write() = h; + } } pub fn get_temp_state_db() -> GuardedTempResult { @@ -650,4 +659,10 @@ impl BlockChainClient for TestBlockChainClient { fn mode(&self) -> Mode { Mode::Active } fn set_mode(&self, _: Mode) { unimplemented!(); } + + fn pruning_info(&self) -> PruningInfo { + PruningInfo { + history_size: *self.history.read(), + } + } } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 67092e986..34bb27726 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -39,6 +39,7 @@ use types::call_analytics::CallAnalytics; use types::blockchain_info::BlockChainInfo; use types::block_status::BlockStatus; use types::mode::Mode; +use types::pruning_info::PruningInfo; #[ipc(client_ident="RemoteClient")] /// Blockchain database client. Owns and manages a blockchain and a block queue. @@ -241,6 +242,9 @@ pub trait BlockChainClient : Sync + Send { /// Returns engine-related extra info for `UncleID`. fn uncle_extra_info(&self, id: UncleID) -> Option>; + + /// Get pruning settings. + fn pruning_info(&self) -> PruningInfo; } /// Extended client interface used for mining diff --git a/ethcore/src/types/mod.rs.in b/ethcore/src/types/mod.rs.in index 1d2cdb3c0..6034f8c59 100644 --- a/ethcore/src/types/mod.rs.in +++ b/ethcore/src/types/mod.rs.in @@ -33,4 +33,5 @@ pub mod transaction_import; pub mod block_import_error; pub mod restoration_status; pub mod snapshot_manifest; -pub mod mode; \ No newline at end of file +pub mod mode; +pub mod pruning_info; diff --git a/ethcore/src/types/pruning_info.rs b/ethcore/src/types/pruning_info.rs new file mode 100644 index 000000000..931ff95da --- /dev/null +++ b/ethcore/src/types/pruning_info.rs @@ -0,0 +1,29 @@ +// Copyright 2015, 2016 Parity Technologies (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Information about portions of the state and chain which the client may serve. +//! +//! Currently assumes that a client will store everything past a certain point +//! or everything. Will be extended in the future to support a definition +//! of which portions of the ancient chain and current state trie are stored as well. + +/// Client pruning info. See module-level docs for more details. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "ipc", binary)] +pub struct PruningInfo { + /// Pruning history size + pub history_size: Option, +} diff --git a/ethcore/src/verification/queue/kind.rs b/ethcore/src/verification/queue/kind.rs index 17b997490..8c9e45510 100644 --- a/ethcore/src/verification/queue/kind.rs +++ b/ethcore/src/verification/queue/kind.rs @@ -19,18 +19,21 @@ use engines::Engine; use error::Error; -use util::{HeapSizeOf, H256}; +use util::{HeapSizeOf, H256, U256}; pub use self::blocks::Blocks; pub use self::headers::Headers; /// Something which can produce a hash and a parent hash. -pub trait HasHash { +pub trait BlockLike { /// Get the hash of this item. fn hash(&self) -> H256; /// Get the hash of this item's parent. fn parent_hash(&self) -> H256; + + /// Get the difficulty of this item. + fn difficulty(&self) -> U256; } /// Defines transitions between stages of verification. @@ -45,13 +48,13 @@ pub trait HasHash { /// consistent. pub trait Kind: 'static + Sized + Send + Sync { /// The first stage: completely unverified. - type Input: Sized + Send + HasHash + HeapSizeOf; + type Input: Sized + Send + BlockLike + HeapSizeOf; /// The second stage: partially verified. - type Unverified: Sized + Send + HasHash + HeapSizeOf; + type Unverified: Sized + Send + BlockLike + HeapSizeOf; /// The third stage: completely verified. - type Verified: Sized + Send + HasHash + HeapSizeOf; + type Verified: Sized + Send + BlockLike + HeapSizeOf; /// Attempt to create the `Unverified` item from the input. fn create(input: Self::Input, engine: &Engine) -> Result; @@ -62,14 +65,14 @@ pub trait Kind: 'static + Sized + Send + Sync { /// The blocks verification module. pub mod blocks { - use super::{Kind, HasHash}; + use super::{Kind, BlockLike}; use engines::Engine; use error::Error; use header::Header; use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered}; - use util::{Bytes, HeapSizeOf, H256}; + use util::{Bytes, HeapSizeOf, H256, U256}; /// A mode for verifying blocks. pub struct Blocks; @@ -126,7 +129,7 @@ pub mod blocks { } } - impl HasHash for Unverified { + impl BlockLike for Unverified { fn hash(&self) -> H256 { self.header.hash() } @@ -134,9 +137,13 @@ pub mod blocks { fn parent_hash(&self) -> H256 { self.header.parent_hash().clone() } + + fn difficulty(&self) -> U256 { + self.header.difficulty().clone() + } } - impl HasHash for PreverifiedBlock { + impl BlockLike for PreverifiedBlock { fn hash(&self) -> H256 { self.header.hash() } @@ -144,12 +151,16 @@ pub mod blocks { fn parent_hash(&self) -> H256 { self.header.parent_hash().clone() } + + fn difficulty(&self) -> U256 { + self.header.difficulty().clone() + } } } /// Verification for headers. pub mod headers { - use super::{Kind, HasHash}; + use super::{Kind, BlockLike}; use engines::Engine; use error::Error; @@ -157,10 +168,12 @@ pub mod headers { use verification::verify_header_params; use util::hash::H256; + use util::U256; - impl HasHash for Header { + impl BlockLike for Header { fn hash(&self) -> H256 { self.hash() } fn parent_hash(&self) -> H256 { self.parent_hash().clone() } + fn difficulty(&self) -> U256 { self.difficulty().clone() } } /// A mode for verifying headers. diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 99e09784d..d2f96ea80 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -26,7 +26,7 @@ use error::*; use engines::Engine; use service::*; -use self::kind::{HasHash, Kind}; +use self::kind::{BlockLike, Kind}; pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo; @@ -101,9 +101,10 @@ pub struct VerificationQueue { deleting: Arc, ready_signal: Arc, empty: Arc, - processing: RwLock>, + processing: RwLock>, // hash to difficulty max_queue_size: usize, max_mem_use: usize, + total_difficulty: RwLock, } struct QueueSignal { @@ -214,10 +215,11 @@ impl VerificationQueue { verification: verification.clone(), verifiers: verifiers, deleting: deleting.clone(), - processing: RwLock::new(HashSet::new()), + processing: RwLock::new(HashMap::new()), empty: empty.clone(), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT), + total_difficulty: RwLock::new(0.into()), } } @@ -335,6 +337,7 @@ impl VerificationQueue { sizes.unverified.store(0, AtomicOrdering::Release); sizes.verifying.store(0, AtomicOrdering::Release); sizes.verified.store(0, AtomicOrdering::Release); + *self.total_difficulty.write() = 0.into(); self.processing.write().clear(); } @@ -349,7 +352,7 @@ impl VerificationQueue { /// Check if the item is currently in the queue pub fn status(&self, hash: &H256) -> Status { - if self.processing.read().contains(hash) { + if self.processing.read().contains_key(hash) { return Status::Queued; } if self.verification.bad.lock().contains(hash) { @@ -362,7 +365,7 @@ impl VerificationQueue { pub fn import(&self, input: K::Input) -> ImportResult { let h = input.hash(); { - if self.processing.read().contains(&h) { + if self.processing.read().contains_key(&h) { return Err(ImportError::AlreadyQueued.into()); } @@ -381,7 +384,11 @@ impl VerificationQueue { Ok(item) => { self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst); - self.processing.write().insert(h.clone()); + self.processing.write().insert(h.clone(), item.difficulty()); + { + let mut td = self.total_difficulty.write(); + *td = *td + item.difficulty(); + } self.verification.unverified.lock().push_back(item); self.more_to_verify.notify_all(); Ok(h) @@ -406,7 +413,10 @@ impl VerificationQueue { bad.reserve(hashes.len()); for hash in hashes { bad.insert(hash.clone()); - processing.remove(hash); + if let Some(difficulty) = processing.remove(hash) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } let mut new_verified = VecDeque::new(); @@ -415,7 +425,10 @@ impl VerificationQueue { if bad.contains(&output.parent_hash()) { removed_size += output.heap_size_of_children(); bad.insert(output.hash()); - processing.remove(&output.hash()); + if let Some(difficulty) = processing.remove(&output.hash()) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } else { new_verified.push_back(output); } @@ -433,7 +446,10 @@ impl VerificationQueue { } let mut processing = self.processing.write(); for hash in hashes { - processing.remove(hash); + if let Some(difficulty) = processing.remove(hash) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } processing.is_empty() } @@ -487,7 +503,13 @@ impl VerificationQueue { } } - /// Optimise memory footprint of the heap fields. + /// Get the total difficulty of all the blocks in the queue. + pub fn total_difficulty(&self) -> U256 { + self.total_difficulty.read().clone() + } + + /// Optimise memory footprint of the heap fields, and adjust the number of threads + /// to better suit the workload. pub fn collect_garbage(&self) { { self.verification.unverified.lock().shrink_to_fit(); @@ -569,6 +591,22 @@ mod tests { } } + #[test] + fn returns_total_difficulty() { + let queue = get_test_queue(); + let block = get_good_dummy_block(); + let hash = BlockView::new(&block).header().hash().clone(); + if let Err(e) = queue.import(Unverified::new(block)) { + panic!("error importing block that is valid by definition({:?})", e); + } + queue.flush(); + assert_eq!(queue.total_difficulty(), 131072.into()); + queue.drain(10); + assert_eq!(queue.total_difficulty(), 131072.into()); + queue.mark_as_good(&[ hash ]); + assert_eq!(queue.total_difficulty(), 0.into()); + } + #[test] fn returns_ok_for_drained_duplicates() { let queue = get_test_queue(); diff --git a/mac/Parity.pkgproj b/mac/Parity.pkgproj index 43e29565b..9d36961ac 100755 --- a/mac/Parity.pkgproj +++ b/mac/Parity.pkgproj @@ -578,7 +578,7 @@ OVERWRITE_PERMISSIONS VERSION - 1.4.6 + 1.4.7 UUID 2DCD5B81-7BAF-4DA1-9251-6274B089FD36 diff --git a/nsis/installer.nsi b/nsis/installer.nsi index 647ffe6c4..b8bf8e836 100644 --- a/nsis/installer.nsi +++ b/nsis/installer.nsi @@ -10,7 +10,7 @@ !define DESCRIPTION "Fast, light, robust Ethereum implementation" !define VERSIONMAJOR 1 !define VERSIONMINOR 4 -!define VERSIONBUILD 6 +!define VERSIONBUILD 7 !define ARGS "--warp" !define FIRST_START_ARGS "ui --warp --mode=passive" diff --git a/sync/src/block_sync.rs b/sync/src/block_sync.rs index eea977906..de9d80ed5 100644 --- a/sync/src/block_sync.rs +++ b/sync/src/block_sync.rs @@ -32,9 +32,8 @@ const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 64; const MAX_RECEPITS_TO_REQUEST: usize = 128; const SUBCHAIN_SIZE: u64 = 256; -const MAX_ROUND_PARENTS: usize = 32; +const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; -const MAX_REORG_BLOCKS: u64 = 20; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state @@ -95,27 +94,38 @@ pub struct BlockDownloader { last_imported_hash: H256, /// Number of blocks imported this round imported_this_round: Option, + /// Block number the last round started with. + last_round_start: BlockNumber, + last_round_start_hash: H256, /// Block parents imported this round (hash, parent) round_parents: VecDeque<(H256, H256)>, /// Do we need to download block recetips. download_receipts: bool, /// Sync up to the block with this hash. target_hash: Option, + /// Reorganize up to this many blocks. Up to genesis if `None`, + max_reorg_blocks: Option, + /// Probing range for seeking common best block. + retract_step: u64, } impl BlockDownloader { /// Create a new instance of syncing strategy. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader { + pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option) -> BlockDownloader { BlockDownloader { state: State::Idle, highest_block: None, last_imported_block: start_number, last_imported_hash: start_hash.clone(), + last_round_start: start_number, + last_round_start_hash: start_hash.clone(), blocks: BlockCollection::new(sync_receipts), imported_this_round: None, round_parents: VecDeque::new(), download_receipts: sync_receipts, target_hash: None, + max_reorg_blocks: max_reorg, + retract_step: 1, } } @@ -127,9 +137,12 @@ impl BlockDownloader { /// Mark a block as known in the chain pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) { - if number == self.last_imported_block + 1 { + if number >= self.last_imported_block + 1 { self.last_imported_block = number; self.last_imported_hash = hash.clone(); + self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + 1); + self.last_round_start = number; + self.last_round_start_hash = hash.clone(); } } @@ -148,12 +161,6 @@ impl BlockDownloader { self.target_hash = Some(hash.clone()); } - /// Set starting sync block - pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) { - self.last_imported_hash = hash.clone(); - self.last_imported_block = number; - } - /// Unmark header as being downloaded. pub fn clear_header_download(&mut self, hash: &H256) { self.blocks.clear_header_download(hash) @@ -172,6 +179,7 @@ impl BlockDownloader { pub fn reset_to(&mut self, hashes: Vec) { self.reset(); self.blocks.reset_to(hashes); + self.state = State::Blocks; } /// Returns used heap memory size. @@ -260,7 +268,7 @@ impl BlockDownloader { return Ok(DownloadAction::Reset); } else { let best = io.chain().chain_info().best_block_number; - if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { + if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) { trace!(target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } @@ -336,39 +344,47 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. + let start = self.last_round_start; + let start_hash = self.last_round_start_hash; match self.imported_this_round { - Some(n) if n == 0 && self.last_imported_block > 0 => { + Some(n) if n == 0 && start > 0 => { // nothing was imported last round, step back to a previous block // search parent in last round known parents first - if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { - self.last_imported_block -= 1; + if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { + self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; - if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { - debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) { + debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { - match io.chain().block_hash(BlockID::Number(self.last_imported_block - 1)) { + let n = start - min(self.retract_step, start); + self.retract_step *= 2; + match io.chain().block_hash(BlockID::Number(n)) { Some(h) => { - self.last_imported_block -= 1; + self.last_imported_block = n; self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); + trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } } } }, - _ => (), + _ => { + self.retract_step = 1; + }, } + self.last_round_start = self.last_imported_block; + self.last_round_start_hash = self.last_imported_hash; self.imported_this_round = None; } @@ -474,6 +490,9 @@ impl BlockDownloader { self.block_imported(&h, number, &parent); }, Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => { + break; + }, + Err(BlockImportError::Block(BlockError::UnknownParent(_))) => { trace!(target: "sync", "Unknown new block parent, restarting sync"); break; }, diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ffd89ecdd..c92aa4aaa 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -94,7 +94,7 @@ use rlp::*; use network::*; use ethcore::views::{HeaderView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; -use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError}; +use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError, BlockQueueInfo}; use ethcore::error::*; use ethcore::snapshot::{ManifestData, RestorationStatus}; use sync_io::SyncIo; @@ -231,6 +231,13 @@ impl SyncStatus { min_peers } } + + /// Is it doing a major sync? + pub fn is_syncing(&self, queue_info: BlockQueueInfo) -> bool { + let is_syncing_state = match self.state { SyncState::Idle | SyncState::NewBlocks => false, _ => true }; + let is_verifying = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3; + is_verifying || is_syncing_state + } } #[derive(PartialEq, Eq, Debug, Clone)] @@ -357,6 +364,7 @@ impl ChainSync { /// Create a new instance of syncing strategy. pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { let chain_info = chain.chain_info(); + let pruning = chain.pruning_info(); let mut sync = ChainSync { state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, starting_block: chain.chain_info().best_block_number, @@ -364,7 +372,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.history_size), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -430,6 +438,7 @@ impl ChainSync { fn reset(&mut self, io: &mut SyncIo) { self.new_blocks.reset(); self.snapshot.clear(); + let chain_info = io.chain().chain_info(); if self.state == SyncState::SnapshotData { debug!(target:"sync", "Aborting snapshot restore"); io.snapshot_service().abort_restore(); @@ -437,6 +446,10 @@ impl ChainSync { for (_, ref mut p) in &mut self.peers { if p.block_set != Some(BlockSet::OldBlocks) { p.reset_asking(); + if p.difficulty.is_none() { + // assume peer has up to date difficulty + p.difficulty = Some(chain_info.pending_total_difficulty); + } } } self.state = SyncState::Idle; @@ -528,19 +541,19 @@ impl ChainSync { /// Update sync after the blockchain has been changed externally. pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block + let pruning = chain.pruning_info(); let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.history_size); + self.old_blocks = None; if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, pruning.history_size); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); } self.old_blocks = Some(downloader); - } else { - self.old_blocks = None; } } @@ -828,6 +841,12 @@ impl ChainSync { trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id); return Ok(()); } + let difficulty: U256 = try!(r.val_at(1)); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.difficulty.map_or(true, |pd| difficulty > pd) { + peer.difficulty = Some(difficulty); + } + } let block_rlp = try!(r.at(0)); let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); @@ -856,6 +875,8 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { + // abort current download of the same block + self.complete_sync(io); self.new_blocks.mark_as_known(&header.hash(), header.number()); trace!(target: "sync", "New block queued {:?} ({})", h, header.number()); }, @@ -874,16 +895,10 @@ impl ChainSync { } else { trace!(target: "sync", "New unknown block {:?}", h); //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - if peer.difficulty.map_or(true, |pd| difficulty > pd) { - peer.difficulty = Some(difficulty); - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - } - } self.sync_peer(io, peer_id, true); } } + self.continue_sync(io); Ok(()) } @@ -893,16 +908,24 @@ impl ChainSync { trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id); return Ok(()); } + let hashes: Vec<_> = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))).collect(); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + // Peer has new blocks with unknown difficulty + peer.difficulty = None; + if let Some(&(Ok(ref h), _)) = hashes.last() { + peer.latest_hash = h.clone(); + } + } if self.state != SyncState::Idle { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::(1).unwrap_or(0)).fold(0u64, max); if max > self.highest_block.unwrap_or(0) { self.highest_block = Some(max); } + self.continue_sync(io); return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; let mut new_hashes = Vec::new(); let last_imported_number = self.new_blocks.last_imported_block_number(); @@ -950,6 +973,7 @@ impl ChainSync { self.state = SyncState::NewBlocks; self.sync_peer(io, peer_id, true); } + self.continue_sync(io); Ok(()) } @@ -1074,7 +1098,7 @@ impl ChainSync { thread_rng().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); - trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); + trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len()); for (p, _, _) in peers { if self.active_peers.contains(&p) { self.sync_peer(io, p, false); @@ -1103,12 +1127,13 @@ impl ChainSync { /// Find something to do for a peer. Called for a new peer or when a peer is done with its task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { if !self.active_peers.contains(&peer_id) { - trace!(target: "sync", "Skipping deactivated peer"); + trace!(target: "sync", "Skipping deactivated peer {}", peer_id); return; } let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = { if let Some(peer) = self.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing || !peer.can_sync() { + trace!(target: "sync", "Skipping busy peer {}", peer_id); return; } if self.state == SyncState::Waiting { @@ -1129,7 +1154,7 @@ impl ChainSync { let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(); let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); - if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { + if force || higher_difficulty || self.old_blocks.is_some() { match self.state { SyncState::WaitingPeers => { trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); @@ -1142,9 +1167,10 @@ impl ChainSync { } let have_latest = io.chain().block_status(BlockID::Hash(peer_latest)) != BlockStatus::Unknown; + trace!(target: "sync", "Considering peer {}, force={}, td={:?}, our td={}, latest={}, have_latest={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, peer_latest, have_latest, self.state); if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) { // check if got new blocks to download - trace!(target: "sync", "Syncing with {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); + trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) { self.request_blocks(io, peer_id, request, BlockSet::NewBlocks); if self.state == SyncState::Idle { @@ -1174,6 +1200,8 @@ impl ChainSync { SyncState::SnapshotManifest | //already downloading from other peer SyncState::Waiting | SyncState::SnapshotWaiting => () } + } else { + trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); } } @@ -1958,7 +1986,9 @@ impl ChainSync { /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) { - if io.is_chain_queue_empty() { + let queue_info = io.chain().queue_info(); + if !self.status().is_syncing(queue_info) || !sealed.is_empty() { + trace!(target: "sync", "Propagating blocks, state={:?}", self.state); self.propagate_latest_blocks(io, sealed); } if !invalid.is_empty() { diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 7705215f5..1b45c4c18 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -255,8 +255,12 @@ fn high_td_attach() { fn disconnect_on_unrelated_chain() { ::env_logger::init().ok(); let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(200, EachBlockWith::Uncle); - net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Nothing); + net.peer(0).chain.set_history(Some(20)); + net.peer(1).chain.set_history(Some(20)); + net.restart_peer(0); + net.restart_peer(1); + net.peer(0).chain.add_blocks(500, EachBlockWith::Uncle); + net.peer(1).chain.add_blocks(300, EachBlockWith::Nothing); net.sync(); assert_eq!(net.disconnect_events, vec![(0, 0)]); } diff --git a/util/Cargo.toml b/util/Cargo.toml index aa9cf4610..c55ba6ddf 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -3,7 +3,7 @@ description = "Ethcore utility library" homepage = "http://ethcore.io" license = "GPL-3.0" name = "ethcore-util" -version = "1.4.6" +version = "1.4.7" authors = ["Ethcore "] build = "build.rs"