diff --git a/Cargo.lock b/Cargo.lock index 0cbb7838c..cfd7ba4d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2464,7 +2464,7 @@ dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "jni 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "panic_hook 0.1.0", - "parity-ethereum 2.4.7", + "parity-ethereum 2.4.8", "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2494,7 +2494,7 @@ dependencies = [ [[package]] name = "parity-ethereum" -version = "2.4.7" +version = "2.4.8" dependencies = [ "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2547,7 +2547,7 @@ dependencies = [ "parity-rpc 1.12.0", "parity-runtime 0.1.0", "parity-updater 1.12.0", - "parity-version 2.4.7", + "parity-version 2.4.8", "parity-whisper 0.1.0", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2698,7 +2698,7 @@ dependencies = [ "parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-runtime 0.1.0", "parity-updater 1.12.0", - "parity-version 2.4.7", + "parity-version 2.4.8", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2797,7 +2797,7 @@ dependencies = [ "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-hash-fetch 1.12.0", "parity-path 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "parity-version 2.4.7", + "parity-version 2.4.8", "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2807,7 +2807,7 @@ dependencies = [ [[package]] name = "parity-version" -version = "2.4.7" +version = "2.4.8" dependencies = [ "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index cbc118c24..25247500b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ description = "Parity Ethereum client" name = "parity-ethereum" # NOTE Make sure to update util/version/Cargo.toml as well -version = "2.4.7" +version = "2.4.8" license = "GPL-3.0" authors = ["Parity Technologies "] diff --git a/ethcore/blockchain/src/blockchain.rs b/ethcore/blockchain/src/blockchain.rs index de6e8c134..7ee735f78 100644 --- a/ethcore/blockchain/src/blockchain.rs +++ b/ethcore/blockchain/src/blockchain.rs @@ -668,21 +668,6 @@ impl BlockChain { self.db.key_value().read_with_cache(db::COL_EXTRA, &self.block_details, parent).map_or(false, |d| d.children.contains(hash)) } - /// fetches the list of blocks from best block to n, and n's parent hash - /// where n > 0 - pub fn block_headers_from_best_block(&self, n: u32) -> Option<(Vec, H256)> { - let mut blocks = Vec::with_capacity(n as usize); - let mut hash = self.best_block_hash(); - - for _ in 0..n { - let current_hash = self.block_header_data(&hash)?; - hash = current_hash.parent_hash(); - blocks.push(current_hash); - } - - Some((blocks, hash)) - } - /// Returns a tree route between `from` and `to`, which is a tuple of: /// /// - a vector of hashes of all blocks, ordered from `from` to `to`. @@ -869,6 +854,14 @@ impl BlockChain { } } + /// clears all caches for testing purposes + pub fn clear_cache(&self) { + self.block_bodies.write().clear(); + self.block_details.write().clear(); + self.block_hashes.write().clear(); + self.block_headers.write().clear(); + } + /// Update the best ancient block to the given hash, after checking that /// it's directly linked to the currently known best ancient block pub fn update_best_ancient_block(&self, hash: &H256) { diff --git a/ethcore/res/ethereum/classic.json b/ethcore/res/ethereum/classic.json index 682b75299..2750f6f63 100644 --- a/ethcore/res/ethereum/classic.json +++ b/ethcore/res/ethereum/classic.json @@ -12,7 +12,7 @@ "ecip1010PauseTransition": "0x2dc6c0", "ecip1010ContinueTransition": "0x4c4b40", "ecip1017EraRounds": "0x4c4b40", - "eip100bTransition": "0x7fffffffffffffff", + "eip100bTransition": "0x85d9a0", "bombDefuseTransition": "0x5a06e0" } } @@ -29,15 +29,15 @@ "forkCanonHash": "0x94365e3a8c0b35089c1d1195081fe7489b528a84b22199c916180db8b28ade7f", "eip150Transition": "0x2625a0", "eip160Transition": "0x2dc6c0", - "eip161abcTransition": "0x7fffffffffffffff", - "eip161dTransition": "0x7fffffffffffffff", + "eip161abcTransition": "0x85d9a0", + "eip161dTransition": "0x85d9a0", "eip155Transition": "0x2dc6c0", "maxCodeSize": "0x6000", - "maxCodeSizeTransition": "0x7fffffffffffffff", - "eip140Transition": "0x7fffffffffffffff", - "eip211Transition": "0x7fffffffffffffff", - "eip214Transition": "0x7fffffffffffffff", - "eip658Transition": "0x7fffffffffffffff" + "maxCodeSizeTransition": "0x85d9a0", + "eip140Transition": "0x85d9a0", + "eip211Transition": "0x85d9a0", + "eip214Transition": "0x85d9a0", + "eip658Transition": "0x85d9a0" }, "genesis": { "seal": { @@ -3905,7 +3905,7 @@ "0x0000000000000000000000000000000000000005": { "builtin": { "name": "modexp", - "activate_at": "0x7fffffffffffffff", + "activate_at": "0x85d9a0", "pricing": { "modexp": { "divisor": 20 @@ -3916,7 +3916,7 @@ "0x0000000000000000000000000000000000000006": { "builtin": { "name": "alt_bn128_add", - "activate_at": "0x7fffffffffffffff", + "activate_at": "0x85d9a0", "pricing": { "linear": { "base": 500, @@ -3928,7 +3928,7 @@ "0x0000000000000000000000000000000000000007": { "builtin": { "name": "alt_bn128_mul", - "activate_at": "0x7fffffffffffffff", + "activate_at": "0x85d9a0", "pricing": { "linear": { "base": 40000, @@ -3940,7 +3940,7 @@ "0x0000000000000000000000000000000000000008": { "builtin": { "name": "alt_bn128_pairing", - "activate_at": "0x7fffffffffffffff", + "activate_at": "0x85d9a0", "pricing": { "alt_bn128_pairing": { "base": 100000, diff --git a/ethcore/service/src/service.rs b/ethcore/service/src/service.rs index c16a07189..095397037 100644 --- a/ethcore/service/src/service.rs +++ b/ethcore/service/src/service.rs @@ -30,8 +30,10 @@ use blockchain::{BlockChainDB, BlockChainDBHandler}; use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage}; use ethcore::miner::Miner; use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; -use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus}; +use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus, Error as SnapshotError}; use ethcore::spec::Spec; +use ethcore::error::{Error as EthcoreError, ErrorKind}; + use ethcore_private_tx::{self, Importer, Signer}; use Error; @@ -197,6 +199,7 @@ impl ClientService { /// Shutdown the Client Service pub fn shutdown(&self) { + trace!(target: "shutdown", "Shutting down Client Service"); self.snapshot.shutdown(); } } @@ -257,7 +260,11 @@ impl IoHandler for ClientIoHandler { let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || { if let Err(e) = snapshot.take_snapshot(&*client, num) { - warn!("Failed to take snapshot at block #{}: {}", num, e); + match e { + EthcoreError(ErrorKind::Snapshot(SnapshotError::SnapshotAborted), _) => info!("Snapshot aborted"), + _ => warn!("Failed to take snapshot at block #{}: {}", num, e), + } + } }); diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 0be48a636..c51cb8251 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -26,7 +26,7 @@ use bytes::Bytes; use call_contract::{CallContract, RegistryInfo}; use ethcore_miner::pool::VerifiedTransaction; use ethcore_miner::service_transaction_checker::ServiceTransactionChecker; -use ethereum_types::{H256, Address, U256}; +use ethereum_types::{H256, H264, Address, U256}; use evm::Schedule; use hash::keccak; use io::IoChannel; @@ -87,7 +87,7 @@ pub use types::blockchain_info::BlockChainInfo; pub use types::block_status::BlockStatus; pub use blockchain::CacheSize as BlockChainCacheSize; pub use verification::QueueInfo as BlockQueueInfo; -use db::Writable; +use db::{Writable, Readable, keys::BlockDetails}; use_contract!(registry, "res/contracts/registrar.json"); @@ -772,8 +772,8 @@ impl Client { liveness: AtomicBool::new(awake), mode: Mutex::new(config.mode.clone()), chain: RwLock::new(chain), - tracedb: tracedb, - engine: engine, + tracedb, + engine, pruning: config.pruning.clone(), db: RwLock::new(db.clone()), state_db: RwLock::new(state_db), @@ -786,8 +786,8 @@ impl Client { ancient_blocks_import_lock: Default::default(), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), - factories: factories, - history: history, + factories, + history, on_user_defaults_change: Mutex::new(None), registrar_address, exit_handler: Mutex::new(None), @@ -1146,7 +1146,12 @@ impl Client { /// Take a snapshot at the given block. /// If the ID given is "latest", this will default to 1000 blocks behind. - pub fn take_snapshot(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> { + pub fn take_snapshot( + &self, + writer: W, + at: BlockId, + p: &snapshot::Progress, + ) -> Result<(), EthcoreError> { let db = self.state_db.read().journal_db().boxed_clone(); let best_block_number = self.chain_info().best_block_number; let block_number = self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))?; @@ -1176,8 +1181,16 @@ impl Client { }; let processing_threads = self.config.snapshot.processing_threads; - snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?; - + let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?; + snapshot::take_snapshot( + chunker, + &self.chain.read(), + start_hash, + db.as_hash_db(), + writer, + p, + processing_threads, + )?; Ok(()) } @@ -1335,37 +1348,60 @@ impl BlockChainReset for Client { fn reset(&self, num: u32) -> Result<(), String> { if num as u64 > self.pruning_history() { return Err("Attempting to reset to block with pruned state".into()) + } else if num == 0 { + return Err("invalid number of blocks to reset".into()) } - let (blocks_to_delete, best_block_hash) = self.chain.read() - .block_headers_from_best_block(num) - .ok_or("Attempted to reset past genesis block")?; + let mut blocks_to_delete = Vec::with_capacity(num as usize); + let mut best_block_hash = self.chain.read().best_block_hash(); + let mut batch = DBTransaction::with_capacity(blocks_to_delete.capacity()); - let mut db_transaction = DBTransaction::with_capacity((num + 1) as usize); + for _ in 0..num { + let current_header = self.chain.read().block_header_data(&best_block_hash) + .expect("best_block_hash was fetched from db; block_header_data should exist in db; qed"); + best_block_hash = current_header.parent_hash(); - for hash in &blocks_to_delete { - db_transaction.delete(::db::COL_HEADERS, &hash.hash()); - db_transaction.delete(::db::COL_BODIES, &hash.hash()); - db_transaction.delete(::db::COL_EXTRA, &hash.hash()); + let (number, hash) = (current_header.number(), current_header.hash()); + batch.delete(::db::COL_HEADERS, &hash); + batch.delete(::db::COL_BODIES, &hash); + Writable::delete:: + (&mut batch, ::db::COL_EXTRA, &hash); Writable::delete:: - (&mut db_transaction, ::db::COL_EXTRA, &hash.number()); + (&mut batch, ::db::COL_EXTRA, &number); + + blocks_to_delete.push((number, hash)); } + let hashes = blocks_to_delete.iter().map(|(_, hash)| hash).collect::>(); + info!("Deleting block hashes {}", + Colour::Red + .bold() + .paint(format!("{:#?}", hashes)) + ); + + let mut best_block_details = Readable::read::( + &**self.db.read().key_value(), + ::db::COL_EXTRA, + &best_block_hash + ).expect("block was previously imported; best_block_details should exist; qed"); + + let (_, last_hash) = blocks_to_delete.last() + .expect("num is > 0; blocks_to_delete can't be empty; qed"); + // remove the last block as a child so that it can be re-imported + // ethcore/blockchain/src/blockchain.rs/Blockchain::is_known_child() + best_block_details.children.retain(|h| *h != *last_hash); + batch.write( + ::db::COL_EXTRA, + &best_block_hash, + &best_block_details + ); // update the new best block hash - db_transaction.put(::db::COL_EXTRA, b"best", &*best_block_hash); + batch.put(::db::COL_EXTRA, b"best", &best_block_hash); self.db.read() .key_value() - .write(db_transaction) - .map_err(|err| format!("could not complete reset operation; io error occured: {}", err))?; - - let hashes = blocks_to_delete.iter().map(|b| b.hash()).collect::>(); - - info!("Deleting block hashes {}", - Colour::Red - .bold() - .paint(format!("{:#?}", hashes)) - ); + .write(batch) + .map_err(|err| format!("could not delete blocks; io error occurred: {}", err))?; info!("New best block hash {}", Colour::Green.bold().paint(format!("{:?}", best_block_hash))); diff --git a/ethcore/src/engines/authority_round/mod.rs b/ethcore/src/engines/authority_round/mod.rs index 400cb54dc..d5612ee42 100644 --- a/ethcore/src/engines/authority_round/mod.rs +++ b/ethcore/src/engines/authority_round/mod.rs @@ -22,7 +22,7 @@ use std::iter::FromIterator; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Weak, Arc}; -use std::time::{UNIX_EPOCH, SystemTime, Duration}; +use std::time::{UNIX_EPOCH, Duration}; use block::*; use client::EngineClient; @@ -42,14 +42,12 @@ use itertools::{self, Itertools}; use rlp::{encode, Decodable, DecoderError, Encodable, RlpStream, Rlp}; use ethereum_types::{H256, H520, Address, U128, U256}; use parking_lot::{Mutex, RwLock}; +use time_utils::CheckedSystemTime; use types::BlockNumber; use types::header::{Header, ExtendedHeader}; use types::ancestry_action::AncestryAction; use unexpected::{Mismatch, OutOfBounds}; -#[cfg(not(time_checked_add))] -use time_utils::CheckedSystemTime; - mod finality; /// `AuthorityRound` params. @@ -574,10 +572,10 @@ fn verify_timestamp(step: &Step, header_step: u64) -> Result<(), BlockError> { // Returning it further won't recover the sync process. trace!(target: "engine", "verify_timestamp: block too early"); - let now = SystemTime::now(); - let found = now.checked_add(Duration::from_secs(oob.found)).ok_or(BlockError::TimestampOverflow)?; - let max = oob.max.and_then(|m| now.checked_add(Duration::from_secs(m))); - let min = oob.min.and_then(|m| now.checked_add(Duration::from_secs(m))); + let found = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(oob.found)) + .ok_or(BlockError::TimestampOverflow)?; + let max = oob.max.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m))); + let min = oob.min.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m))); let new_oob = OutOfBounds { min, max, found }; diff --git a/ethcore/src/externalities.rs b/ethcore/src/externalities.rs index a2f35b6de..798230644 100644 --- a/ethcore/src/externalities.rs +++ b/ethcore/src/externalities.rs @@ -314,7 +314,11 @@ impl<'a, T: 'a, V: 'a, B: 'a> Ext for Externalities<'a, T, V, B> } fn extcodehash(&self, address: &Address) -> vm::Result> { - Ok(self.state.code_hash(address)?) + if self.state.exists_and_not_null(address)? { + Ok(self.state.code_hash(address)?) + } else { + Ok(None) + } } fn extcodesize(&self, address: &Address) -> vm::Result> { diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index e6129feb2..636466fdd 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -15,7 +15,6 @@ // along with Parity Ethereum. If not, see . #![warn(missing_docs, unused_extern_crates)] -#![cfg_attr(feature = "time_checked_add", feature(time_checked_add))] //! Ethcore library //! @@ -101,6 +100,7 @@ extern crate rlp; extern crate rustc_hex; extern crate serde; extern crate stats; +extern crate time_utils; extern crate triehash_ethereum as triehash; extern crate unexpected; extern crate using_queue; @@ -150,9 +150,6 @@ extern crate fetch; #[cfg(all(test, feature = "price-info"))] extern crate parity_runtime; -#[cfg(not(time_checked_add))] -extern crate time_utils; - pub mod block; pub mod builtin; pub mod client; diff --git a/ethcore/src/snapshot/account.rs b/ethcore/src/snapshot/account.rs index ed56e2435..2a9ac911f 100644 --- a/ethcore/src/snapshot/account.rs +++ b/ethcore/src/snapshot/account.rs @@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut}; use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP}; use hash_db::HashDB; use rlp::{RlpStream, Rlp}; -use snapshot::Error; +use snapshot::{Error, Progress}; use std::collections::HashSet; use trie::{Trie, TrieMut}; +use std::sync::atomic::Ordering; // An empty account -- these were replaced with RLP null data for a space optimization in v1. const ACC_EMPTY: BasicAccount = BasicAccount { @@ -65,8 +66,16 @@ impl CodeState { // walk the account's storage trie, returning a vector of RLP items containing the // account address hash, account properties and the storage. Each item contains at most `max_storage_items` // storage records split according to snapshot format definition. -pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet, first_chunk_size: usize, max_chunk_size: usize) -> Result, Error> { - let db = &(acct_db as &HashDB<_,_>); +pub fn to_fat_rlps( + account_hash: &H256, + acc: &BasicAccount, + acct_db: &AccountDB, + used_code: &mut HashSet, + first_chunk_size: usize, + max_chunk_size: usize, + p: &Progress, +) -> Result, Error> { + let db = &(acct_db as &dyn HashDB<_,_>); let db = TrieDB::new(db, &acc.storage_root)?; let mut chunks = Vec::new(); let mut db_iter = db.iter()?; @@ -112,6 +121,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, } loop { + if p.abort.load(Ordering::SeqCst) { + trace!(target: "snapshot", "to_fat_rlps: aborting snapshot"); + return Err(Error::SnapshotAborted); + } match db_iter.next() { Some(Ok((k, v))) => { let pair = { @@ -211,6 +224,7 @@ mod tests { use types::basic_account::BasicAccount; use test_helpers::get_temp_state_db; use snapshot::tests::helpers::fill_storage; + use snapshot::Progress; use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak}; use ethereum_types::{H256, Address}; @@ -236,8 +250,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - - let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -262,7 +276,9 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + + let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap(); let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap(); assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account); } @@ -287,7 +303,8 @@ mod tests { let thin_rlp = ::rlp::encode(&account); assert_eq!(::rlp::decode::(&thin_rlp).unwrap(), account); - let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap(); + let p = Progress::default(); + let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap(); let mut root = KECCAK_NULL_RLP; let mut restored_account = None; for rlp in fat_rlps { @@ -319,20 +336,21 @@ mod tests { nonce: 50.into(), balance: 123456789.into(), storage_root: KECCAK_NULL_RLP, - code_hash: code_hash, + code_hash, }; let account2 = BasicAccount { nonce: 400.into(), balance: 98765432123456789usize.into(), storage_root: KECCAK_NULL_RLP, - code_hash: code_hash, + code_hash, }; let mut used_code = HashSet::new(); - - let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); - let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); + let p1 = Progress::default(); + let p2 = Progress::default(); + let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap(); + let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap(); assert_eq!(used_code.len(), 1); let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap(); @@ -350,6 +368,6 @@ mod tests { #[test] fn encoding_empty_acc() { let mut db = get_temp_state_db(); - assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::default()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None)); + assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::zero()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None)); } } diff --git a/ethcore/src/snapshot/error.rs b/ethcore/src/snapshot/error.rs index b71f79f80..6faa19da2 100644 --- a/ethcore/src/snapshot/error.rs +++ b/ethcore/src/snapshot/error.rs @@ -61,6 +61,8 @@ pub enum Error { ChunkTooLarge, /// Snapshots not supported by the consensus engine. SnapshotsUnsupported, + /// Aborted snapshot + SnapshotAborted, /// Bad epoch transition. BadEpochProof(u64), /// Wrong chunk format. @@ -91,6 +93,7 @@ impl fmt::Display for Error { Error::ChunkTooSmall => write!(f, "Chunk size is too small."), Error::ChunkTooLarge => write!(f, "Chunk size is too large."), Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."), + Error::SnapshotAborted => write!(f, "Snapshot was aborted."), Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i), Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg), Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"), diff --git a/ethcore/src/snapshot/io.rs b/ethcore/src/snapshot/io.rs index c5f178cd3..536862e7b 100644 --- a/ethcore/src/snapshot/io.rs +++ b/ethcore/src/snapshot/io.rs @@ -310,10 +310,7 @@ impl LooseReader { dir.pop(); - Ok(LooseReader { - dir: dir, - manifest: manifest, - }) + Ok(LooseReader { dir, manifest }) } } diff --git a/ethcore/src/snapshot/mod.rs b/ethcore/src/snapshot/mod.rs index c05d0de6a..ae7a21709 100644 --- a/ethcore/src/snapshot/mod.rs +++ b/ethcore/src/snapshot/mod.rs @@ -22,7 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::cmp; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY}; use account_db::{AccountDB, AccountDBMut}; @@ -107,7 +107,7 @@ impl Default for SnapshotConfiguration { fn default() -> Self { SnapshotConfiguration { no_periodic: false, - processing_threads: ::std::cmp::max(1, num_cpus::get() / 2), + processing_threads: ::std::cmp::max(1, num_cpus::get_physical() / 2), } } } @@ -117,8 +117,9 @@ impl Default for SnapshotConfiguration { pub struct Progress { accounts: AtomicUsize, blocks: AtomicUsize, - size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes. + size: AtomicU64, done: AtomicBool, + abort: AtomicBool, } impl Progress { @@ -127,6 +128,7 @@ impl Progress { self.accounts.store(0, Ordering::Release); self.blocks.store(0, Ordering::Release); self.size.store(0, Ordering::Release); + self.abort.store(false, Ordering::Release); // atomic fence here to ensure the others are written first? // logs might very rarely get polluted if not. @@ -140,7 +142,7 @@ impl Progress { pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) } /// Get the written size of the snapshot in bytes. - pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) } + pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) } /// Whether the snapshot is complete. pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) } @@ -148,27 +150,28 @@ impl Progress { } /// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer. pub fn take_snapshot( - engine: &EthEngine, + chunker: Box, chain: &BlockChain, - block_at: H256, - state_db: &HashDB, + block_hash: H256, + state_db: &dyn HashDB, writer: W, p: &Progress, processing_threads: usize, ) -> Result<(), Error> { - let start_header = chain.block_header_data(&block_at) - .ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; + let start_header = chain.block_header_data(&block_hash) + .ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?; let state_root = start_header.state_root(); - let number = start_header.number(); + let block_number = start_header.number(); - info!("Taking snapshot starting at block {}", number); + info!("Taking snapshot starting at block {}", block_number); + let version = chunker.current_version(); let writer = Mutex::new(writer); - let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; - let snapshot_version = chunker.current_version(); let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec, Vec), Error> { let writer = &writer; - let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); + let block_guard = scope.spawn(move || { + chunk_secondary(chunker, chain, block_hash, writer, p) + }); // The number of threads must be between 1 and SNAPSHOT_SUBPARTS assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots"); @@ -183,7 +186,7 @@ pub fn take_snapshot( for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) { debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx); - let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?; + let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?; chunk_hashes.append(&mut hashes); } @@ -207,12 +210,12 @@ pub fn take_snapshot( info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); let manifest_data = ManifestData { - version: snapshot_version, - state_hashes: state_hashes, - block_hashes: block_hashes, - state_root: state_root, - block_number: number, - block_hash: block_at, + version, + state_hashes, + block_hashes, + state_root, + block_number, + block_hash, }; writer.into_inner().finish(manifest_data)?; @@ -228,7 +231,13 @@ pub fn take_snapshot( /// Secondary chunks are engine-specific, but they intend to corroborate the state data /// in the state chunks. /// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis. -pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a BlockChain, start_hash: H256, writer: &Mutex, progress: &'a Progress) -> Result, Error> { +pub fn chunk_secondary<'a>( + mut chunker: Box, + chain: &'a BlockChain, + start_hash: H256, + writer: &Mutex, + progress: &'a Progress +) -> Result, Error> { let mut chunk_hashes = Vec::new(); let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)]; @@ -243,7 +252,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box, chain: &'a Bloc trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}", hash, size, raw_data.len()); - progress.size.fetch_add(size, Ordering::SeqCst); + progress.size.fetch_add(size as u64, Ordering::SeqCst); chunk_hashes.push(hash); Ok(()) }; @@ -266,8 +275,9 @@ struct StateChunker<'a> { rlps: Vec, cur_size: usize, snappy_buffer: Vec, - writer: &'a Mutex, + writer: &'a Mutex, progress: &'a Progress, + thread_idx: usize, } impl<'a> StateChunker<'a> { @@ -297,10 +307,10 @@ impl<'a> StateChunker<'a> { let hash = keccak(&compressed); self.writer.lock().write_state_chunk(hash, compressed)?; - trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len()); + trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len()); self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst); - self.progress.size.fetch_add(compressed_size, Ordering::SeqCst); + self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst); self.hashes.push(hash); self.cur_size = 0; @@ -321,7 +331,14 @@ impl<'a> StateChunker<'a> { /// /// Returns a list of hashes of chunks created, or any error it may /// have encountered. -pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: &Mutex, progress: &'a Progress, part: Option) -> Result, Error> { +pub fn chunk_state<'a>( + db: &dyn HashDB, + root: &H256, + writer: &Mutex, + progress: &'a Progress, + part: Option, + thread_idx: usize, +) -> Result, Error> { let account_trie = TrieDB::new(&db, &root)?; let mut chunker = StateChunker { @@ -329,8 +346,9 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: rlps: Vec::new(), cur_size: 0, snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)], - writer: writer, - progress: progress, + writer, + progress, + thread_idx, }; let mut used_code = HashSet::new(); @@ -365,7 +383,7 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: let account = ::rlp::decode(&*account_data)?; let account_db = AccountDB::from_hash(db, account_key_hash); - let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?; + let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?; for (i, fat_rlp) in fat_rlps.into_iter().enumerate() { if i > 0 { chunker.write_chunk()?; @@ -383,7 +401,7 @@ pub fn chunk_state<'a>(db: &HashDB, root: &H256, writer: /// Used to rebuild the state trie piece by piece. pub struct StateRebuilder { - db: Box, + db: Box, state_root: H256, known_code: HashMap, // code hashes mapped to first account with this code. missing_code: HashMap>, // maps code hashes to lists of accounts missing that code. @@ -393,7 +411,7 @@ pub struct StateRebuilder { impl StateRebuilder { /// Create a new state rebuilder to write into the given backing DB. - pub fn new(db: Arc, pruning: Algorithm) -> Self { + pub fn new(db: Arc, pruning: Algorithm) -> Self { StateRebuilder { db: journaldb::new(db.clone(), pruning, ::db::COL_STATE), state_root: KECCAK_NULL_RLP, @@ -411,7 +429,7 @@ impl StateRebuilder { let mut pairs = Vec::with_capacity(rlp.item_count()?); // initialize the pairs vector with empty values so we have slots to write into. - pairs.resize(rlp.item_count()?, (H256::new(), Vec::new())); + pairs.resize(rlp.item_count()?, (H256::zero(), Vec::new())); let status = rebuild_accounts( self.db.as_hash_db_mut(), @@ -468,7 +486,7 @@ impl StateRebuilder { /// Finalize the restoration. Check for accounts missing code and make a dummy /// journal entry. /// Once all chunks have been fed, there should be nothing missing. - pub fn finalize(mut self, era: u64, id: H256) -> Result, ::error::Error> { + pub fn finalize(mut self, era: u64, id: H256) -> Result, ::error::Error> { let missing = self.missing_code.keys().cloned().collect::>(); if !missing.is_empty() { return Err(Error::MissingCode(missing).into()) } @@ -493,7 +511,7 @@ struct RebuiltStatus { // rebuild a set of accounts and their storage. // returns a status detailing newly-loaded code and accounts missing code. fn rebuild_accounts( - db: &mut HashDB, + db: &mut dyn HashDB, account_fat_rlps: Rlp, out_chunk: &mut [(H256, Bytes)], known_code: &HashMap, @@ -512,7 +530,7 @@ fn rebuild_accounts( // fill out the storage trie and code while decoding. let (acc, maybe_code) = { let mut acct_db = AccountDBMut::from_hash(db, hash); - let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or(H256::zero()); + let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or_default(); account::from_fat_rlp(&mut acct_db, fat_rlp, storage_root)? }; @@ -560,7 +578,7 @@ const POW_VERIFY_RATE: f32 = 0.02; /// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform /// the fullest verification possible. If not, it will take a random sample to determine whether it will /// do heavy or light verification. -pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> { +pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &dyn EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> { engine.verify_block_basic(header)?; if always || rng.gen::() <= POW_VERIFY_RATE { diff --git a/ethcore/src/snapshot/service.rs b/ethcore/src/snapshot/service.rs index 4b3f196cb..ddae76a00 100644 --- a/ethcore/src/snapshot/service.rs +++ b/ethcore/src/snapshot/service.rs @@ -415,7 +415,7 @@ impl Service { _ => break, } - // Writting changes to DB and logging every now and then + // Writing changes to DB and logging every now and then if block_number % 1_000 == 0 { next_db.key_value().write_buffered(batch); next_chain.commit(); @@ -479,16 +479,12 @@ impl Service { let guard = Guard::new(temp_dir.clone()); let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress); - self.taking_snapshot.store(false, Ordering::SeqCst); if let Err(e) = res { if client.chain_info().best_block_number >= num + client.pruning_history() { - // "Cancelled" is mincing words a bit -- what really happened - // is that the state we were snapshotting got pruned out - // before we could finish. - info!("Periodic snapshot failed: block state pruned.\ - Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); - return Ok(()) + // The state we were snapshotting was pruned before we could finish. + info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`"); + return Err(e); } else { return Err(e); } @@ -846,14 +842,29 @@ impl SnapshotService for Service { } } + fn abort_snapshot(&self) { + if self.taking_snapshot.load(Ordering::SeqCst) { + trace!(target: "snapshot", "Aborting snapshot – Snapshot under way"); + self.progress.abort.store(true, Ordering::SeqCst); + } + } + fn shutdown(&self) { + trace!(target: "snapshot", "Shut down SnapshotService"); self.abort_restore(); + trace!(target: "snapshot", "Shut down SnapshotService - restore aborted"); + self.abort_snapshot(); + trace!(target: "snapshot", "Shut down SnapshotService - snapshot aborted"); } } impl Drop for Service { fn drop(&mut self) { + trace!(target: "shutdown", "Dropping Service"); self.abort_restore(); + trace!(target: "shutdown", "Dropping Service - restore aborted"); + self.abort_snapshot(); + trace!(target: "shutdown", "Dropping Service - snapshot aborted"); } } diff --git a/ethcore/src/snapshot/tests/service.rs b/ethcore/src/snapshot/tests/service.rs index 37a10048a..515e5992f 100644 --- a/ethcore/src/snapshot/tests/service.rs +++ b/ethcore/src/snapshot/tests/service.rs @@ -188,14 +188,15 @@ fn keep_ancient_blocks() { &state_root, &writer, &Progress::default(), - None + None, + 0 ).unwrap(); let manifest = ::snapshot::ManifestData { version: 2, - state_hashes: state_hashes, - state_root: state_root, - block_hashes: block_hashes, + state_hashes, + state_root, + block_hashes, block_number: NUM_BLOCKS, block_hash: best_hash, }; diff --git a/ethcore/src/snapshot/tests/state.rs b/ethcore/src/snapshot/tests/state.rs index fa7df6b61..0d9760332 100644 --- a/ethcore/src/snapshot/tests/state.rs +++ b/ethcore/src/snapshot/tests/state.rs @@ -55,7 +55,7 @@ fn snap_and_restore() { let mut state_hashes = Vec::new(); for part in 0..SNAPSHOT_SUBPARTS { - let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap(); + let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap(); state_hashes.append(&mut hashes); } @@ -126,8 +126,8 @@ fn get_code_from_prev_chunk() { let mut make_chunk = |acc, hash| { let mut db = journaldb::new_memory_db(); AccountDBMut::from_hash(&mut db, hash).insert(&code[..]); - - let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap(); + let p = Progress::default(); + let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap(); let mut stream = RlpStream::new_list(1); stream.append_raw(&fat_rlp[0], 1); stream.out() @@ -171,13 +171,13 @@ fn checks_flag() { let state_root = producer.state_root(); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); - let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap(); + let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap(); writer.into_inner().finish(::snapshot::ManifestData { version: 2, - state_hashes: state_hashes, + state_hashes, block_hashes: Vec::new(), - state_root: state_root, + state_root, block_number: 0, block_hash: H256::default(), }).unwrap(); diff --git a/ethcore/src/snapshot/traits.rs b/ethcore/src/snapshot/traits.rs index bb4ab3b39..aa61b595b 100644 --- a/ethcore/src/snapshot/traits.rs +++ b/ethcore/src/snapshot/traits.rs @@ -55,6 +55,9 @@ pub trait SnapshotService : Sync + Send { /// no-op if currently restoring. fn restore_block_chunk(&self, hash: H256, chunk: Bytes); + /// Abort in-progress snapshotting if there is one. + fn abort_snapshot(&self); + /// Shutdown the Snapshot Service by aborting any ongoing restore fn shutdown(&self); } diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 60ea5e1f0..a1bb9eeda 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -28,7 +28,7 @@ use types::view; use types::views::BlockView; use block::IsBlock; -use client::{BlockChainClient, Client, ClientConfig, BlockId, ChainInfo, BlockInfo, PrepareOpenBlock, ImportSealedBlock, ImportBlock}; +use client::{BlockChainClient, BlockChainReset, Client, ClientConfig, BlockId, ChainInfo, BlockInfo, PrepareOpenBlock, ImportSealedBlock, ImportBlock}; use ethereum; use executive::{Executive, TransactOptions}; use miner::{Miner, PendingOrdering, MinerService}; @@ -367,3 +367,23 @@ fn transaction_proof() { assert_eq!(state.balance(&Address::default()).unwrap(), 5.into()); assert_eq!(state.balance(&address).unwrap(), 95.into()); } + +#[test] +fn reset_blockchain() { + let client = get_test_client_with_blocks(get_good_dummy_block_seq(19)); + // 19 + genesis block + assert!(client.block_header(BlockId::Number(20)).is_some()); + assert_eq!(client.block_header(BlockId::Number(20)).unwrap().hash(), client.best_block_header().hash()); + + assert!(client.reset(5).is_ok()); + + client.chain().clear_cache(); + + assert!(client.block_header(BlockId::Number(20)).is_none()); + assert!(client.block_header(BlockId::Number(19)).is_none()); + assert!(client.block_header(BlockId::Number(18)).is_none()); + assert!(client.block_header(BlockId::Number(17)).is_none()); + assert!(client.block_header(BlockId::Number(16)).is_none()); + + assert!(client.block_header(BlockId::Number(15)).is_some()); +} diff --git a/ethcore/src/verification/verification.rs b/ethcore/src/verification/verification.rs index 331672602..2ddc7fae9 100644 --- a/ethcore/src/verification/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -40,7 +40,6 @@ use types::{BlockNumber, header::Header}; use types::transaction::SignedTransaction; use verification::queue::kind::blocks::Unverified; -#[cfg(not(time_checked_add))] use time_utils::CheckedSystemTime; /// Preprocessed block data gathered in `verify_block_unordered` call @@ -310,7 +309,7 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool, // this will resist overflow until `year 2037` let max_time = SystemTime::now() + ACCEPTABLE_DRIFT; let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9; - let timestamp = UNIX_EPOCH.checked_add(Duration::from_secs(header.timestamp())) + let timestamp = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(header.timestamp())) .ok_or(BlockError::TimestampOverflow)?; if timestamp > invalid_threshold { @@ -334,9 +333,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) { let now = SystemTime::now(); - let min = now.checked_add(Duration::from_secs(parent.timestamp().saturating_add(1))) + let min = CheckedSystemTime::checked_add(now, Duration::from_secs(parent.timestamp().saturating_add(1))) .ok_or(BlockError::TimestampOverflow)?; - let found = now.checked_add(Duration::from_secs(header.timestamp())) + let found = CheckedSystemTime::checked_add(now, Duration::from_secs(header.timestamp())) .ok_or(BlockError::TimestampOverflow)?; return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found }))) } diff --git a/ethcore/sync/src/tests/snapshot.rs b/ethcore/sync/src/tests/snapshot.rs index a3aa77d36..d865adc2a 100644 --- a/ethcore/sync/src/tests/snapshot.rs +++ b/ethcore/sync/src/tests/snapshot.rs @@ -122,6 +122,8 @@ impl SnapshotService for TestSnapshotService { self.block_restoration_chunks.lock().clear(); } + fn abort_snapshot(&self) {} + fn restore_state_chunk(&self, hash: H256, chunk: Bytes) { if self.restoration_manifest.lock().as_ref().map_or(false, |m| m.state_hashes.iter().any(|h| h == &hash)) { self.state_restoration_chunks.lock().insert(hash, chunk); diff --git a/parity/configuration.rs b/parity/configuration.rs index 198a58c3f..1a8338637 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -932,7 +932,7 @@ impl Configuration { no_periodic: self.args.flag_no_periodic_snapshot, processing_threads: match self.args.arg_snapshot_threads { Some(threads) if threads > 0 => threads, - _ => ::std::cmp::max(1, num_cpus::get() / 2), + _ => ::std::cmp::max(1, num_cpus::get_physical() / 2), }, }; diff --git a/parity/lib.rs b/parity/lib.rs index 9141bdcad..a88a68947 100644 --- a/parity/lib.rs +++ b/parity/lib.rs @@ -15,7 +15,6 @@ // along with Parity Ethereum. If not, see . //! Ethcore client application. - #![warn(missing_docs)] extern crate ansi_term; diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 9287f6272..035990a9d 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -252,6 +252,7 @@ pub struct FullDependencies { pub gas_price_percentile: usize, pub poll_lifetime: u32, pub allow_missing_blocks: bool, + pub no_ancient_blocks: bool, } impl FullDependencies { @@ -303,6 +304,7 @@ impl FullDependencies { gas_price_percentile: self.gas_price_percentile, allow_missing_blocks: self.allow_missing_blocks, allow_experimental_rpcs: self.experimental_rpcs, + no_ancient_blocks: self.no_ancient_blocks } ); handler.extend_with(client.to_delegate()); diff --git a/parity/run.rs b/parity/run.rs index c4a3c7512..87f2ccab5 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -753,6 +753,7 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: gas_price_percentile: cmd.gas_price_percentile, poll_lifetime: cmd.poll_lifetime, allow_missing_blocks: cmd.allow_missing_blocks, + no_ancient_blocks: !cmd.download_old_blocks, }); let dependencies = rpc::Dependencies { @@ -903,17 +904,27 @@ impl RunningClient { // Create a weak reference to the client so that we can wait on shutdown // until it is dropped let weak_client = Arc::downgrade(&client); - // Shutdown and drop the ServiceClient + // Shutdown and drop the ClientService client_service.shutdown(); + trace!(target: "shutdown", "ClientService shut down"); drop(client_service); + trace!(target: "shutdown", "ClientService dropped"); // drop this stuff as soon as exit detected. drop(rpc); + trace!(target: "shutdown", "RPC dropped"); drop(keep_alive); + trace!(target: "shutdown", "KeepAlive dropped"); // to make sure timer does not spawn requests while shutdown is in progress informant.shutdown(); + trace!(target: "shutdown", "Informant shut down"); // just Arc is dropping here, to allow other reference release in its default time drop(informant); + trace!(target: "shutdown", "Informant dropped"); drop(client); + trace!(target: "shutdown", "Client dropped"); + // This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]` + // trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count()); + trace!(target: "shutdown", "Waiting for refs to Client to shutdown"); wait_for_drop(weak_client); } } @@ -947,24 +958,30 @@ fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &Datab } fn wait_for_drop(w: Weak) { - let sleep_duration = Duration::from_secs(1); - let warn_timeout = Duration::from_secs(60); - let max_timeout = Duration::from_secs(300); + const SLEEP_DURATION: Duration = Duration::from_secs(1); + const WARN_TIMEOUT: Duration = Duration::from_secs(60); + const MAX_TIMEOUT: Duration = Duration::from_secs(300); let instant = Instant::now(); let mut warned = false; - while instant.elapsed() < max_timeout { + while instant.elapsed() < MAX_TIMEOUT { if w.upgrade().is_none() { return; } - if !warned && instant.elapsed() > warn_timeout { + if !warned && instant.elapsed() > WARN_TIMEOUT { warned = true; warn!("Shutdown is taking longer than expected."); } - thread::sleep(sleep_duration); + thread::sleep(SLEEP_DURATION); + + // When debugging shutdown issues on a nightly build it can help to enable this with the + // `#![feature(weak_counts)]` added to lib.rs (TODO: enable when + // https://github.com/rust-lang/rust/issues/57977 is stable) + // trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count()); + trace!(target: "shutdown", "Waiting for client to drop"); } warn!("Shutdown timeout reached, exiting uncleanly."); diff --git a/parity/snapshot.rs b/parity/snapshot.rs index 70957762f..269965c33 100644 --- a/parity/snapshot.rs +++ b/parity/snapshot.rs @@ -261,7 +261,7 @@ impl SnapshotCommand { let cur_size = p.size(); if cur_size != last_size { last_size = cur_size; - let bytes = ::informant::format_bytes(p.size()); + let bytes = ::informant::format_bytes(cur_size as usize); info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes); } diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 1ff40f979..6949b0403 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -29,6 +29,7 @@ use light::on_demand::error::{Error as OnDemandError, ErrorKind as OnDemandError use ethcore::client::BlockChainClient; use types::blockchain_info::BlockChainInfo; use v1::types::BlockNumber; +use v1::impls::EthClientOptions; mod codes { // NOTE [ToDr] Codes from [-32099, -32000] @@ -221,18 +222,34 @@ pub fn cannot_submit_work(err: EthcoreError) -> Error { } } -pub fn unavailable_block() -> Error { - Error { - code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), - message: "Ancient block sync is still in progress".into(), - data: None, +pub fn unavailable_block(no_ancient_block: bool, by_hash: bool) -> Error { + if no_ancient_block { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), + message: "Looks like you disabled ancient block download, unfortunately the information you're \ + trying to fetch doesn't exist in the db and is probably in the ancient blocks.".into(), + data: None, + } + } else if by_hash { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), + message: "Block information is incomplete while ancient block sync is still in progress, before \ + it's finished we can't determine the existence of requested item.".into(), + data: None, + } + } else { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), + message: "Requested block number is in a range that is not available yet, because the ancient block sync is still in progress.".into(), + data: None, + } } } pub fn check_block_number_existence<'a, T, C>( client: &'a C, num: BlockNumber, - allow_missing_blocks: bool, + options: EthClientOptions, ) -> impl Fn(Option) -> RpcResult> + 'a where C: BlockChainClient, @@ -242,8 +259,8 @@ pub fn check_block_number_existence<'a, T, C>( if let BlockNumber::Num(block_number) = num { // tried to fetch block number and got nothing even though the block number is // less than the latest block number - if block_number < client.chain_info().best_block_number && !allow_missing_blocks { - return Err(unavailable_block()); + if block_number < client.chain_info().best_block_number && !options.allow_missing_blocks { + return Err(unavailable_block(options.no_ancient_blocks, false)); } } } @@ -253,22 +270,17 @@ pub fn check_block_number_existence<'a, T, C>( pub fn check_block_gap<'a, T, C>( client: &'a C, - allow_missing_blocks: bool, + options: EthClientOptions, ) -> impl Fn(Option) -> RpcResult> + 'a where C: BlockChainClient, { move |response| { - if response.is_none() && !allow_missing_blocks { + if response.is_none() && !options.allow_missing_blocks { let BlockChainInfo { ancient_block_hash, .. } = client.chain_info(); // block information was requested, but unfortunately we couldn't find it and there // are gaps in the database ethcore/src/blockchain/blockchain.rs if ancient_block_hash.is_some() { - return Err(Error { - code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), - message: "Block information is incomplete while ancient block sync is still in progress, before \ - it's finished we can't determine the existence of requested item.".into(), - data: None, - }) + return Err(unavailable_block(options.no_ancient_blocks, true)) } } Ok(response) diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 2e2fd05c2..acc3a0876 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -54,6 +54,7 @@ use v1::metadata::Metadata; const EXTRA_INFO_PROOF: &str = "Object exists in blockchain (fetched earlier), extra_info is always available if object exists; qed"; /// Eth RPC options +#[derive(Copy, Clone)] pub struct EthClientOptions { /// Return nonce from transaction queue when pending block not available. pub pending_nonce_from_queue: bool, @@ -68,6 +69,8 @@ pub struct EthClientOptions { pub allow_missing_blocks: bool, /// Enable Experimental RPC-Calls pub allow_experimental_rpcs: bool, + /// flag for ancient block sync + pub no_ancient_blocks: bool, } impl EthClientOptions { @@ -89,6 +92,7 @@ impl Default for EthClientOptions { gas_price_percentile: 50, allow_missing_blocks: false, allow_experimental_rpcs: false, + no_ancient_blocks: false, } } } @@ -674,7 +678,7 @@ impl Eth for EthClient< let trx_count = self.client.block(BlockId::Hash(hash.into())) .map(|block| block.transactions_count().into()); let result = Ok(trx_count) - .and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + .and_then(errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } @@ -689,7 +693,7 @@ impl Eth for EthClient< .and_then(errors::check_block_number_existence( &*self.client, num, - self.options.allow_missing_blocks + self.options )) } })) @@ -699,7 +703,7 @@ impl Eth for EthClient< let uncle_count = self.client.block(BlockId::Hash(hash.into())) .map(|block| block.uncles_count().into()); let result = Ok(uncle_count) - .and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + .and_then(errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } @@ -713,7 +717,7 @@ impl Eth for EthClient< .and_then(errors::check_block_number_existence( &*self.client, num, - self.options.allow_missing_blocks + self.options )) } })) @@ -734,14 +738,14 @@ impl Eth for EthClient< } fn block_by_hash(&self, hash: H256, include_txs: bool) -> BoxFuture> { - let result = self.rich_block(BlockId::Hash(hash.into()).into(), include_txs) - .and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + let result = self.rich_block(BlockId::Hash(hash).into(), include_txs) + .and_then(errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture> { let result = self.rich_block(num.clone().into(), include_txs).and_then( - errors::check_block_number_existence(&*self.client, num, self.options.allow_missing_blocks)); + errors::check_block_number_existence(&*self.client, num, self.options)); Box::new(future::done(result)) } @@ -751,14 +755,14 @@ impl Eth for EthClient< .map(|t| Transaction::from_pending(t.pending().clone())) }); let result = Ok(tx).and_then( - errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } fn transaction_by_block_hash_and_index(&self, hash: H256, index: Index) -> BoxFuture> { let id = PendingTransactionId::Location(PendingOrBlock::Block(BlockId::Hash(hash.into())), index.value()); let result = self.transaction(id).and_then( - errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } @@ -772,7 +776,7 @@ impl Eth for EthClient< let transaction_id = PendingTransactionId::Location(block_id, index.value()); let result = self.transaction(transaction_id).and_then( - errors::check_block_number_existence(&*self.client, num, self.options.allow_missing_blocks)); + errors::check_block_number_existence(&*self.client, num, self.options)); Box::new(future::done(result)) } @@ -786,7 +790,7 @@ impl Eth for EthClient< let receipt = self.client.transaction_receipt(TransactionId::Hash(hash)); let result = Ok(receipt.map(Into::into)) - .and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + .and_then(errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } @@ -794,7 +798,7 @@ impl Eth for EthClient< let result = self.uncle(PendingUncleId { id: PendingOrBlock::Block(BlockId::Hash(hash.into())), position: index.value() - }).and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks)); + }).and_then(errors::check_block_gap(&*self.client, self.options)); Box::new(future::done(result)) } @@ -811,7 +815,7 @@ impl Eth for EthClient< .and_then(errors::check_block_number_existence( &*self.client, num, - self.options.allow_missing_blocks + self.options )); Box::new(future::done(result)) diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 68710ae55..9c2273746 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -143,7 +143,8 @@ impl EthTester { send_block_number_in_get_work: true, gas_price_percentile: 50, allow_experimental_rpcs: true, - allow_missing_blocks: false + allow_missing_blocks: false, + no_ancient_blocks: false }, ); diff --git a/rpc/src/v1/tests/helpers/snapshot_service.rs b/rpc/src/v1/tests/helpers/snapshot_service.rs index 5450886bb..881c434e1 100644 --- a/rpc/src/v1/tests/helpers/snapshot_service.rs +++ b/rpc/src/v1/tests/helpers/snapshot_service.rs @@ -48,6 +48,7 @@ impl SnapshotService for TestSnapshotService { fn status(&self) -> RestorationStatus { self.status.lock().clone() } fn begin_restore(&self, _manifest: ManifestData) { } fn abort_restore(&self) { } + fn abort_snapshot(&self) {} fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { } fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { } fn shutdown(&self) { } diff --git a/scripts/docker/hub/publish-docker.sh b/scripts/docker/hub/publish-docker.sh index 6602d55c2..84feedb28 100755 --- a/scripts/docker/hub/publish-docker.sh +++ b/scripts/docker/hub/publish-docker.sh @@ -3,7 +3,9 @@ set -e # fail on any error VERSION=$(cat ./tools/VERSION) +TRACK=$(cat ./tools/TRACK) echo "Parity Ethereum version = ${VERSION}" +echo "Parity Ethereum track = ${TRACK}" test "$Docker_Hub_User_Parity" -a "$Docker_Hub_Pass_Parity" \ || ( echo "no docker credentials provided"; exit 1 ) @@ -44,6 +46,14 @@ case "${SCHEDULE_TAG:-${CI_COMMIT_REF_NAME}}" in --file tools/Dockerfile .; docker push "parity/parity:${VERSION}-${CI_COMMIT_REF_NAME}"; docker push "parity/parity:stable";; + v[0-9]*.[0-9]*) + echo "Docker TAG - 'parity/parity:${VERSION}-${TRACK}'" + docker build --no-cache \ + --build-arg VCS_REF="${CI_COMMIT_SHA}" \ + --build-arg BUILD_DATE="$(date -u '+%Y-%m-%dT%H:%M:%SZ')" \ + --tag "parity/parity:${VERSION}-${TRACK}" \ + --file tools/Dockerfile .; + docker push "parity/parity:${VERSION}-${TRACK}";; *) echo "Docker TAG - 'parity/parity:${VERSION}-${CI_COMMIT_REF_NAME}'" docker build --no-cache \ diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 7bf8dc62e..f18469e16 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -168,7 +168,6 @@ pub struct Discovery<'a> { discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, - // Sometimes we don't want to add nodes to the NodeTable, but still want to // keep track of them to avoid excessive pinging (happens when an unknown node sends // a discovery request to us -- the node might be on a different net). @@ -257,7 +256,7 @@ impl<'a> Discovery<'a> { Ok(()) => None, Err(BucketError::Ourselves) => None, Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance)) - }.map(|(node_entry, bucket_distance)| { + }.and_then(|(node_entry, bucket_distance)| { trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance); let mut added = HashMap::with_capacity(1); @@ -265,7 +264,7 @@ impl<'a> Discovery<'a> { let node_to_ping = { let bucket = &mut self.node_buckets[bucket_distance]; - bucket.nodes.push_front(BucketEntry::new(node_entry)); + bucket.nodes.push_front(BucketEntry::new(node_entry.clone())); if bucket.nodes.len() > BUCKET_SIZE { select_bucket_ping(bucket.nodes.iter()) } else { @@ -275,7 +274,12 @@ impl<'a> Discovery<'a> { if let Some(node) = node_to_ping { self.try_ping(node, PingReason::Default); }; - TableUpdates{added, removed: HashSet::new()} + + if node_entry.endpoint.is_valid_sync_node() { + Some(TableUpdates { added, removed: HashSet::new() }) + } else { + None + } }) } @@ -518,7 +522,18 @@ impl<'a> Discovery<'a> { fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result, Error> { trace!(target: "discovery", "Got Ping from {:?}", &from); - let ping_from = NodeEndpoint::from_rlp(&rlp.at(1)?)?; + let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) { + node_endpoint + } else { + let mut address = from.clone(); + // address here is the node's tcp port. If we are unable to get the `NodeEndpoint` from the `ping_from` + // rlp field then this is most likely a BootNode, set the tcp port to 0 because it can not be used for syncing. + address.set_port(0); + NodeEndpoint { + address, + udp_port: from.port() + } + }; let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?; let timestamp: u64 = rlp.val_at(3)?; self.check_timestamp(timestamp)?; @@ -540,7 +555,7 @@ impl<'a> Discovery<'a> { self.send_packet(PACKET_PONG, from, &response.drain())?; let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() }; - if !entry.endpoint.is_valid() { + if !entry.endpoint.is_valid_discovery_node() { debug!(target: "discovery", "Got bad address: {:?}", entry); } else if !self.is_allowed(&entry) { debug!(target: "discovery", "Address not allowed: {:?}", entry); @@ -728,7 +743,7 @@ impl<'a> Discovery<'a> { trace!(target: "discovery", "Got {} Neighbours from {:?}", results_count, &from); for r in rlp.at(0)?.iter() { let endpoint = NodeEndpoint::from_rlp(&r)?; - if !endpoint.is_valid() { + if !endpoint.is_valid_discovery_node() { debug!(target: "discovery", "Bad address: {:?}", endpoint); continue; } diff --git a/util/network-devp2p/src/node_table.rs b/util/network-devp2p/src/node_table.rs index 3cee93fd9..db001bfe7 100644 --- a/util/network-devp2p/src/node_table.rs +++ b/util/network-devp2p/src/node_table.rs @@ -103,10 +103,16 @@ impl NodeEndpoint { self.to_rlp(rlp); } - /// Validates that the port is not 0 and address IP is specified - pub fn is_valid(&self) -> bool { - self.udp_port != 0 && self.address.port() != 0 && - match self.address { + /// Validates that the tcp port is not 0 and that the node is a valid discovery node (i.e. `is_valid_discovery_node()` is true). + /// Sync happens over tcp. + pub fn is_valid_sync_node(&self) -> bool { + self.is_valid_discovery_node() && self.address.port() != 0 + } + + /// Validates that the udp port is not 0 and address IP is specified. + /// Peer discovery happens over udp. + pub fn is_valid_discovery_node(&self) -> bool { + self.udp_port != 0 && match self.address { SocketAddr::V4(a) => !a.ip().is_unspecified(), SocketAddr::V6(a) => !a.ip().is_unspecified() } diff --git a/util/version/Cargo.toml b/util/version/Cargo.toml index edfb05ef5..bb32df700 100644 --- a/util/version/Cargo.toml +++ b/util/version/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "parity-version" # NOTE: this value is used for Parity Ethereum version string (via env CARGO_PKG_VERSION) -version = "2.4.7" +version = "2.4.8" authors = ["Parity Technologies "] build = "build.rs"