Stable 2.4.8 (#10779)

* ethcore/res: activate atlantis classic hf on block 8772000 (#10766)

* fix docker tags for publishing (#10741)

* merge-backports

* Update version

* remove clique engine from backports

* Reset blockchain properly (#10669)

* delete BlockDetails from COL_EXTRA

* better proofs

* added tests

* PR suggestions

* adds rpc error message for --no-ancient-blocks (#10608)

* adds error message for --no-ancient-blocks, closes #10261

* Apply suggestions from code review

Co-Authored-By: seunlanlege <seunlanlege@gmail.com>

* Treat empty account the same as non-exist accounts in EIP-1052 (#10775)

* fix: aura don't add `SystemTime::now()` (#10720)

This commit does the following:
- Prevent overflow in `verify_timestamp()` by not adding `now` to found faulty timestamp
- Use explicit `CheckedSystemTime::checked_add` to prevent potential consensus issues because SystemTime is platform
depedent
- remove `#[cfg(not(time_checked_add))]` conditional compilation

* DevP2p: Get node IP address and udp port from Socket, if not included in PING packet (#10705)

* get node IP address and udp port from Socket, if not included in PING packet

* prevent bootnodes from being added to host nodes

* code corrections

* code corrections

* code corrections

* code corrections

* docs

* code corrections

* code corrections

* Apply suggestions from code review

Co-Authored-By: David <dvdplm@gmail.com>

* Revert "fix: aura don't add `SystemTime::now()` (#10720)"

This reverts commit f104784849ec58c768b022e690db0eaba1607253.

* Add a way to signal shutdown to snapshotting threads (#10744)

* Add a way to signal shutdown to snapshotting threads

* Pass Progress to fat_rlps() so we can abort from there too.

* Checking for abort in a single spot

* Remove nightly-only weak/strong counts

* fix warning

* Fix tests

* Add dummy impl to abort snapshots

* Add another dummy impl for TestSnapshotService

* Remove debugging code

* Return error instead of the odd Ok(())
Switch to AtomicU64

* revert .as_bytes() change

* fix build

* fix build maybe
This commit is contained in:
s3krit 2019-06-25 13:38:37 +00:00 committed by GitHub
parent d6c55469c9
commit 25435c6e7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 381 additions and 207 deletions

12
Cargo.lock generated
View File

@ -2464,7 +2464,7 @@ dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"jni 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"panic_hook 0.1.0",
"parity-ethereum 2.4.7",
"parity-ethereum 2.4.8",
"tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2494,7 +2494,7 @@ dependencies = [
[[package]]
name = "parity-ethereum"
version = "2.4.7"
version = "2.4.8"
dependencies = [
"ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2547,7 +2547,7 @@ dependencies = [
"parity-rpc 1.12.0",
"parity-runtime 0.1.0",
"parity-updater 1.12.0",
"parity-version 2.4.7",
"parity-version 2.4.8",
"parity-whisper 0.1.0",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2698,7 +2698,7 @@ dependencies = [
"parity-crypto 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-runtime 0.1.0",
"parity-updater 1.12.0",
"parity-version 2.4.7",
"parity-version 2.4.8",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2797,7 +2797,7 @@ dependencies = [
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-hash-fetch 1.12.0",
"parity-path 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-version 2.4.7",
"parity-version 2.4.8",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2807,7 +2807,7 @@ dependencies = [
[[package]]
name = "parity-version"
version = "2.4.7"
version = "2.4.8"
dependencies = [
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -2,7 +2,7 @@
description = "Parity Ethereum client"
name = "parity-ethereum"
# NOTE Make sure to update util/version/Cargo.toml as well
version = "2.4.7"
version = "2.4.8"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]

View File

@ -668,21 +668,6 @@ impl BlockChain {
self.db.key_value().read_with_cache(db::COL_EXTRA, &self.block_details, parent).map_or(false, |d| d.children.contains(hash))
}
/// fetches the list of blocks from best block to n, and n's parent hash
/// where n > 0
pub fn block_headers_from_best_block(&self, n: u32) -> Option<(Vec<encoded::Header>, H256)> {
let mut blocks = Vec::with_capacity(n as usize);
let mut hash = self.best_block_hash();
for _ in 0..n {
let current_hash = self.block_header_data(&hash)?;
hash = current_hash.parent_hash();
blocks.push(current_hash);
}
Some((blocks, hash))
}
/// Returns a tree route between `from` and `to`, which is a tuple of:
///
/// - a vector of hashes of all blocks, ordered from `from` to `to`.
@ -869,6 +854,14 @@ impl BlockChain {
}
}
/// clears all caches for testing purposes
pub fn clear_cache(&self) {
self.block_bodies.write().clear();
self.block_details.write().clear();
self.block_hashes.write().clear();
self.block_headers.write().clear();
}
/// Update the best ancient block to the given hash, after checking that
/// it's directly linked to the currently known best ancient block
pub fn update_best_ancient_block(&self, hash: &H256) {

View File

@ -12,7 +12,7 @@
"ecip1010PauseTransition": "0x2dc6c0",
"ecip1010ContinueTransition": "0x4c4b40",
"ecip1017EraRounds": "0x4c4b40",
"eip100bTransition": "0x7fffffffffffffff",
"eip100bTransition": "0x85d9a0",
"bombDefuseTransition": "0x5a06e0"
}
}
@ -29,15 +29,15 @@
"forkCanonHash": "0x94365e3a8c0b35089c1d1195081fe7489b528a84b22199c916180db8b28ade7f",
"eip150Transition": "0x2625a0",
"eip160Transition": "0x2dc6c0",
"eip161abcTransition": "0x7fffffffffffffff",
"eip161dTransition": "0x7fffffffffffffff",
"eip161abcTransition": "0x85d9a0",
"eip161dTransition": "0x85d9a0",
"eip155Transition": "0x2dc6c0",
"maxCodeSize": "0x6000",
"maxCodeSizeTransition": "0x7fffffffffffffff",
"eip140Transition": "0x7fffffffffffffff",
"eip211Transition": "0x7fffffffffffffff",
"eip214Transition": "0x7fffffffffffffff",
"eip658Transition": "0x7fffffffffffffff"
"maxCodeSizeTransition": "0x85d9a0",
"eip140Transition": "0x85d9a0",
"eip211Transition": "0x85d9a0",
"eip214Transition": "0x85d9a0",
"eip658Transition": "0x85d9a0"
},
"genesis": {
"seal": {
@ -3905,7 +3905,7 @@
"0x0000000000000000000000000000000000000005": {
"builtin": {
"name": "modexp",
"activate_at": "0x7fffffffffffffff",
"activate_at": "0x85d9a0",
"pricing": {
"modexp": {
"divisor": 20
@ -3916,7 +3916,7 @@
"0x0000000000000000000000000000000000000006": {
"builtin": {
"name": "alt_bn128_add",
"activate_at": "0x7fffffffffffffff",
"activate_at": "0x85d9a0",
"pricing": {
"linear": {
"base": 500,
@ -3928,7 +3928,7 @@
"0x0000000000000000000000000000000000000007": {
"builtin": {
"name": "alt_bn128_mul",
"activate_at": "0x7fffffffffffffff",
"activate_at": "0x85d9a0",
"pricing": {
"linear": {
"base": 40000,
@ -3940,7 +3940,7 @@
"0x0000000000000000000000000000000000000008": {
"builtin": {
"name": "alt_bn128_pairing",
"activate_at": "0x7fffffffffffffff",
"activate_at": "0x85d9a0",
"pricing": {
"alt_bn128_pairing": {
"base": 100000,

View File

@ -30,8 +30,10 @@ use blockchain::{BlockChainDB, BlockChainDBHandler};
use ethcore::client::{Client, ClientConfig, ChainNotify, ClientIoMessage};
use ethcore::miner::Miner;
use ethcore::snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus};
use ethcore::snapshot::{SnapshotService as _SnapshotService, RestorationStatus, Error as SnapshotError};
use ethcore::spec::Spec;
use ethcore::error::{Error as EthcoreError, ErrorKind};
use ethcore_private_tx::{self, Importer, Signer};
use Error;
@ -197,6 +199,7 @@ impl ClientService {
/// Shutdown the Client Service
pub fn shutdown(&self) {
trace!(target: "shutdown", "Shutting down Client Service");
self.snapshot.shutdown();
}
}
@ -257,7 +260,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
if let Err(e) = snapshot.take_snapshot(&*client, num) {
warn!("Failed to take snapshot at block #{}: {}", num, e);
match e {
EthcoreError(ErrorKind::Snapshot(SnapshotError::SnapshotAborted), _) => info!("Snapshot aborted"),
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
}
}
});

View File

@ -26,7 +26,7 @@ use bytes::Bytes;
use call_contract::{CallContract, RegistryInfo};
use ethcore_miner::pool::VerifiedTransaction;
use ethcore_miner::service_transaction_checker::ServiceTransactionChecker;
use ethereum_types::{H256, Address, U256};
use ethereum_types::{H256, H264, Address, U256};
use evm::Schedule;
use hash::keccak;
use io::IoChannel;
@ -87,7 +87,7 @@ pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
pub use blockchain::CacheSize as BlockChainCacheSize;
pub use verification::QueueInfo as BlockQueueInfo;
use db::Writable;
use db::{Writable, Readable, keys::BlockDetails};
use_contract!(registry, "res/contracts/registrar.json");
@ -772,8 +772,8 @@ impl Client {
liveness: AtomicBool::new(awake),
mode: Mutex::new(config.mode.clone()),
chain: RwLock::new(chain),
tracedb: tracedb,
engine: engine,
tracedb,
engine,
pruning: config.pruning.clone(),
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
@ -786,8 +786,8 @@ impl Client {
ancient_blocks_import_lock: Default::default(),
queue_consensus_message: IoChannelQueue::new(usize::max_value()),
last_hashes: RwLock::new(VecDeque::new()),
factories: factories,
history: history,
factories,
history,
on_user_defaults_change: Mutex::new(None),
registrar_address,
exit_handler: Mutex::new(None),
@ -1146,7 +1146,12 @@ impl Client {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(&self, writer: W, at: BlockId, p: &snapshot::Progress) -> Result<(), EthcoreError> {
pub fn take_snapshot<W: snapshot_io::SnapshotWriter + Send>(
&self,
writer: W,
at: BlockId,
p: &snapshot::Progress,
) -> Result<(), EthcoreError> {
let db = self.state_db.read().journal_db().boxed_clone();
let best_block_number = self.chain_info().best_block_number;
let block_number = self.block_number(at).ok_or(snapshot::Error::InvalidStartingBlock(at))?;
@ -1176,8 +1181,16 @@ impl Client {
};
let processing_threads = self.config.snapshot.processing_threads;
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hash_db(), writer, p, processing_threads)?;
let chunker = self.engine.snapshot_components().ok_or(snapshot::Error::SnapshotsUnsupported)?;
snapshot::take_snapshot(
chunker,
&self.chain.read(),
start_hash,
db.as_hash_db(),
writer,
p,
processing_threads,
)?;
Ok(())
}
@ -1335,37 +1348,60 @@ impl BlockChainReset for Client {
fn reset(&self, num: u32) -> Result<(), String> {
if num as u64 > self.pruning_history() {
return Err("Attempting to reset to block with pruned state".into())
} else if num == 0 {
return Err("invalid number of blocks to reset".into())
}
let (blocks_to_delete, best_block_hash) = self.chain.read()
.block_headers_from_best_block(num)
.ok_or("Attempted to reset past genesis block")?;
let mut blocks_to_delete = Vec::with_capacity(num as usize);
let mut best_block_hash = self.chain.read().best_block_hash();
let mut batch = DBTransaction::with_capacity(blocks_to_delete.capacity());
let mut db_transaction = DBTransaction::with_capacity((num + 1) as usize);
for _ in 0..num {
let current_header = self.chain.read().block_header_data(&best_block_hash)
.expect("best_block_hash was fetched from db; block_header_data should exist in db; qed");
best_block_hash = current_header.parent_hash();
for hash in &blocks_to_delete {
db_transaction.delete(::db::COL_HEADERS, &hash.hash());
db_transaction.delete(::db::COL_BODIES, &hash.hash());
db_transaction.delete(::db::COL_EXTRA, &hash.hash());
let (number, hash) = (current_header.number(), current_header.hash());
batch.delete(::db::COL_HEADERS, &hash);
batch.delete(::db::COL_BODIES, &hash);
Writable::delete::<BlockDetails, H264>
(&mut batch, ::db::COL_EXTRA, &hash);
Writable::delete::<H256, BlockNumberKey>
(&mut db_transaction, ::db::COL_EXTRA, &hash.number());
(&mut batch, ::db::COL_EXTRA, &number);
blocks_to_delete.push((number, hash));
}
let hashes = blocks_to_delete.iter().map(|(_, hash)| hash).collect::<Vec<_>>();
info!("Deleting block hashes {}",
Colour::Red
.bold()
.paint(format!("{:#?}", hashes))
);
let mut best_block_details = Readable::read::<BlockDetails, H264>(
&**self.db.read().key_value(),
::db::COL_EXTRA,
&best_block_hash
).expect("block was previously imported; best_block_details should exist; qed");
let (_, last_hash) = blocks_to_delete.last()
.expect("num is > 0; blocks_to_delete can't be empty; qed");
// remove the last block as a child so that it can be re-imported
// ethcore/blockchain/src/blockchain.rs/Blockchain::is_known_child()
best_block_details.children.retain(|h| *h != *last_hash);
batch.write(
::db::COL_EXTRA,
&best_block_hash,
&best_block_details
);
// update the new best block hash
db_transaction.put(::db::COL_EXTRA, b"best", &*best_block_hash);
batch.put(::db::COL_EXTRA, b"best", &best_block_hash);
self.db.read()
.key_value()
.write(db_transaction)
.map_err(|err| format!("could not complete reset operation; io error occured: {}", err))?;
let hashes = blocks_to_delete.iter().map(|b| b.hash()).collect::<Vec<_>>();
info!("Deleting block hashes {}",
Colour::Red
.bold()
.paint(format!("{:#?}", hashes))
);
.write(batch)
.map_err(|err| format!("could not delete blocks; io error occurred: {}", err))?;
info!("New best block hash {}", Colour::Green.bold().paint(format!("{:?}", best_block_hash)));

View File

@ -22,7 +22,7 @@ use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::time::{UNIX_EPOCH, Duration};
use block::*;
use client::EngineClient;
@ -42,14 +42,12 @@ use itertools::{self, Itertools};
use rlp::{encode, Decodable, DecoderError, Encodable, RlpStream, Rlp};
use ethereum_types::{H256, H520, Address, U128, U256};
use parking_lot::{Mutex, RwLock};
use time_utils::CheckedSystemTime;
use types::BlockNumber;
use types::header::{Header, ExtendedHeader};
use types::ancestry_action::AncestryAction;
use unexpected::{Mismatch, OutOfBounds};
#[cfg(not(time_checked_add))]
use time_utils::CheckedSystemTime;
mod finality;
/// `AuthorityRound` params.
@ -574,10 +572,10 @@ fn verify_timestamp(step: &Step, header_step: u64) -> Result<(), BlockError> {
// Returning it further won't recover the sync process.
trace!(target: "engine", "verify_timestamp: block too early");
let now = SystemTime::now();
let found = now.checked_add(Duration::from_secs(oob.found)).ok_or(BlockError::TimestampOverflow)?;
let max = oob.max.and_then(|m| now.checked_add(Duration::from_secs(m)));
let min = oob.min.and_then(|m| now.checked_add(Duration::from_secs(m)));
let found = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(oob.found))
.ok_or(BlockError::TimestampOverflow)?;
let max = oob.max.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m)));
let min = oob.min.and_then(|m| CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(m)));
let new_oob = OutOfBounds { min, max, found };

View File

@ -314,7 +314,11 @@ impl<'a, T: 'a, V: 'a, B: 'a> Ext for Externalities<'a, T, V, B>
}
fn extcodehash(&self, address: &Address) -> vm::Result<Option<H256>> {
Ok(self.state.code_hash(address)?)
if self.state.exists_and_not_null(address)? {
Ok(self.state.code_hash(address)?)
} else {
Ok(None)
}
}
fn extcodesize(&self, address: &Address) -> vm::Result<Option<usize>> {

View File

@ -15,7 +15,6 @@
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
#![warn(missing_docs, unused_extern_crates)]
#![cfg_attr(feature = "time_checked_add", feature(time_checked_add))]
//! Ethcore library
//!
@ -101,6 +100,7 @@ extern crate rlp;
extern crate rustc_hex;
extern crate serde;
extern crate stats;
extern crate time_utils;
extern crate triehash_ethereum as triehash;
extern crate unexpected;
extern crate using_queue;
@ -150,9 +150,6 @@ extern crate fetch;
#[cfg(all(test, feature = "price-info"))]
extern crate parity_runtime;
#[cfg(not(time_checked_add))]
extern crate time_utils;
pub mod block;
pub mod builtin;
pub mod client;

View File

@ -24,9 +24,10 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use hash_db::HashDB;
use rlp::{RlpStream, Rlp};
use snapshot::Error;
use snapshot::{Error, Progress};
use std::collections::HashSet;
use trie::{Trie, TrieMut};
use std::sync::atomic::Ordering;
// An empty account -- these were replaced with RLP null data for a space optimization in v1.
const ACC_EMPTY: BasicAccount = BasicAccount {
@ -65,8 +66,16 @@ impl CodeState {
// walk the account's storage trie, returning a vector of RLP items containing the
// account address hash, account properties and the storage. Each item contains at most `max_storage_items`
// storage records split according to snapshot format definition.
pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB, used_code: &mut HashSet<H256>, first_chunk_size: usize, max_chunk_size: usize) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &HashDB<_,_>);
pub fn to_fat_rlps(
account_hash: &H256,
acc: &BasicAccount,
acct_db: &AccountDB,
used_code: &mut HashSet<H256>,
first_chunk_size: usize,
max_chunk_size: usize,
p: &Progress,
) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?;
let mut chunks = Vec::new();
let mut db_iter = db.iter()?;
@ -112,6 +121,10 @@ pub fn to_fat_rlps(account_hash: &H256, acc: &BasicAccount, acct_db: &AccountDB,
}
loop {
if p.abort.load(Ordering::SeqCst) {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted);
}
match db_iter.next() {
Some(Ok((k, v))) => {
let pair = {
@ -211,6 +224,7 @@ mod tests {
use types::basic_account::BasicAccount;
use test_helpers::get_temp_state_db;
use snapshot::tests::helpers::fill_storage;
use snapshot::Progress;
use hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use ethereum_types::{H256, Address};
@ -236,8 +250,8 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
@ -262,7 +276,9 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value()).unwrap();
let p = Progress::default();
let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &addr), fat_rlp, H256::zero()).unwrap().0, account);
}
@ -287,7 +303,8 @@ mod tests {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000).unwrap();
let p = Progress::default();
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::new(db.as_hash_db(), &addr), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
for rlp in fat_rlps {
@ -319,20 +336,21 @@ mod tests {
nonce: 50.into(),
balance: 123456789.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: code_hash,
code_hash,
};
let account2 = BasicAccount {
nonce: 400.into(),
balance: 98765432123456789usize.into(),
storage_root: KECCAK_NULL_RLP,
code_hash: code_hash,
code_hash,
};
let mut used_code = HashSet::new();
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
let p1 = Progress::default();
let p2 = Progress::default();
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::new(db.as_hash_db(), &addr1), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::new(db.as_hash_db(), &addr2), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);
let fat_rlp1 = Rlp::new(&fat_rlp1[0]).at(1).unwrap();
@ -350,6 +368,6 @@ mod tests {
#[test]
fn encoding_empty_acc() {
let mut db = get_temp_state_db();
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::default()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
assert_eq!(from_fat_rlp(&mut AccountDBMut::new(db.as_hash_db_mut(), &Address::zero()), Rlp::new(&::rlp::NULL_RLP), H256::zero()).unwrap(), (ACC_EMPTY, None));
}
}

View File

@ -61,6 +61,8 @@ pub enum Error {
ChunkTooLarge,
/// Snapshots not supported by the consensus engine.
SnapshotsUnsupported,
/// Aborted snapshot
SnapshotAborted,
/// Bad epoch transition.
BadEpochProof(u64),
/// Wrong chunk format.
@ -91,6 +93,7 @@ impl fmt::Display for Error {
Error::ChunkTooSmall => write!(f, "Chunk size is too small."),
Error::ChunkTooLarge => write!(f, "Chunk size is too large."),
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
Error::SnapshotAborted => write!(f, "Snapshot was aborted."),
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),

View File

@ -310,10 +310,7 @@ impl LooseReader {
dir.pop();
Ok(LooseReader {
dir: dir,
manifest: manifest,
})
Ok(LooseReader { dir, manifest })
}
}

View File

@ -22,7 +22,7 @@
use std::collections::{HashMap, HashSet};
use std::cmp;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
use account_db::{AccountDB, AccountDBMut};
@ -107,7 +107,7 @@ impl Default for SnapshotConfiguration {
fn default() -> Self {
SnapshotConfiguration {
no_periodic: false,
processing_threads: ::std::cmp::max(1, num_cpus::get() / 2),
processing_threads: ::std::cmp::max(1, num_cpus::get_physical() / 2),
}
}
}
@ -117,8 +117,9 @@ impl Default for SnapshotConfiguration {
pub struct Progress {
accounts: AtomicUsize,
blocks: AtomicUsize,
size: AtomicUsize, // Todo [rob] use Atomicu64 when it stabilizes.
size: AtomicU64,
done: AtomicBool,
abort: AtomicBool,
}
impl Progress {
@ -127,6 +128,7 @@ impl Progress {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);
self.abort.store(false, Ordering::Release);
// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
@ -140,7 +142,7 @@ impl Progress {
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }
/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> usize { self.size.load(Ordering::Acquire) }
pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) }
/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }
@ -148,27 +150,28 @@ impl Progress {
}
/// Take a snapshot using the given blockchain, starting block hash, and database, writing into the given writer.
pub fn take_snapshot<W: SnapshotWriter + Send>(
engine: &EthEngine,
chunker: Box<dyn SnapshotComponents>,
chain: &BlockChain,
block_at: H256,
state_db: &HashDB<KeccakHasher, DBValue>,
block_hash: H256,
state_db: &dyn HashDB<KeccakHasher, DBValue>,
writer: W,
p: &Progress,
processing_threads: usize,
) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
let start_header = chain.block_header_data(&block_hash)
.ok_or_else(|| Error::InvalidStartingBlock(BlockId::Hash(block_hash)))?;
let state_root = start_header.state_root();
let number = start_header.number();
let block_number = start_header.number();
info!("Taking snapshot starting at block {}", number);
info!("Taking snapshot starting at block {}", block_number);
let version = chunker.current_version();
let writer = Mutex::new(writer);
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
let snapshot_version = chunker.current_version();
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer;
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
let block_guard = scope.spawn(move || {
chunk_secondary(chunker, chain, block_hash, writer, p)
});
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
@ -183,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
chunk_hashes.append(&mut hashes);
}
@ -207,12 +210,12 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData {
version: snapshot_version,
state_hashes: state_hashes,
block_hashes: block_hashes,
state_root: state_root,
block_number: number,
block_hash: block_at,
version,
state_hashes,
block_hashes,
state_root,
block_number,
block_hash,
};
writer.into_inner().finish(manifest_data)?;
@ -228,7 +231,13 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
/// Secondary chunks are engine-specific, but they intend to corroborate the state data
/// in the state chunks.
/// Returns a list of chunk hashes, with the first having the blocks furthest from the genesis.
pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a BlockChain, start_hash: H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
pub fn chunk_secondary<'a>(
mut chunker: Box<dyn SnapshotComponents>,
chain: &'a BlockChain,
start_hash: H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress
) -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
@ -243,7 +252,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());
progress.size.fetch_add(size, Ordering::SeqCst);
progress.size.fetch_add(size as u64, Ordering::SeqCst);
chunk_hashes.push(hash);
Ok(())
};
@ -266,8 +275,9 @@ struct StateChunker<'a> {
rlps: Vec<Bytes>,
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<SnapshotWriter + 'a>,
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
thread_idx: usize,
}
impl<'a> StateChunker<'a> {
@ -297,10 +307,10 @@ impl<'a> StateChunker<'a> {
let hash = keccak(&compressed);
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "wrote state chunk. size: {}, uncompressed size: {}", compressed_size, raw_data.len());
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());
self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
self.hashes.push(hash);
self.cur_size = 0;
@ -321,7 +331,14 @@ impl<'a> StateChunker<'a> {
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
pub fn chunk_state<'a>(
db: &dyn HashDB<KeccakHasher, DBValue>,
root: &H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
part: Option<usize>,
thread_idx: usize,
) -> Result<Vec<H256>, Error> {
let account_trie = TrieDB::new(&db, &root)?;
let mut chunker = StateChunker {
@ -329,8 +346,9 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
rlps: Vec::new(),
cur_size: 0,
snappy_buffer: vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)],
writer: writer,
progress: progress,
writer,
progress,
thread_idx,
};
let mut used_code = HashSet::new();
@ -365,7 +383,7 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
if i > 0 {
chunker.write_chunk()?;
@ -383,7 +401,7 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher, DBValue>, root: &H256, writer:
/// Used to rebuild the state trie piece by piece.
pub struct StateRebuilder {
db: Box<JournalDB>,
db: Box<dyn JournalDB>,
state_root: H256,
known_code: HashMap<H256, H256>, // code hashes mapped to first account with this code.
missing_code: HashMap<H256, Vec<H256>>, // maps code hashes to lists of accounts missing that code.
@ -393,7 +411,7 @@ pub struct StateRebuilder {
impl StateRebuilder {
/// Create a new state rebuilder to write into the given backing DB.
pub fn new(db: Arc<KeyValueDB>, pruning: Algorithm) -> Self {
pub fn new(db: Arc<dyn KeyValueDB>, pruning: Algorithm) -> Self {
StateRebuilder {
db: journaldb::new(db.clone(), pruning, ::db::COL_STATE),
state_root: KECCAK_NULL_RLP,
@ -411,7 +429,7 @@ impl StateRebuilder {
let mut pairs = Vec::with_capacity(rlp.item_count()?);
// initialize the pairs vector with empty values so we have slots to write into.
pairs.resize(rlp.item_count()?, (H256::new(), Vec::new()));
pairs.resize(rlp.item_count()?, (H256::zero(), Vec::new()));
let status = rebuild_accounts(
self.db.as_hash_db_mut(),
@ -468,7 +486,7 @@ impl StateRebuilder {
/// Finalize the restoration. Check for accounts missing code and make a dummy
/// journal entry.
/// Once all chunks have been fed, there should be nothing missing.
pub fn finalize(mut self, era: u64, id: H256) -> Result<Box<JournalDB>, ::error::Error> {
pub fn finalize(mut self, era: u64, id: H256) -> Result<Box<dyn JournalDB>, ::error::Error> {
let missing = self.missing_code.keys().cloned().collect::<Vec<_>>();
if !missing.is_empty() { return Err(Error::MissingCode(missing).into()) }
@ -493,7 +511,7 @@ struct RebuiltStatus {
// rebuild a set of accounts and their storage.
// returns a status detailing newly-loaded code and accounts missing code.
fn rebuild_accounts(
db: &mut HashDB<KeccakHasher, DBValue>,
db: &mut dyn HashDB<KeccakHasher, DBValue>,
account_fat_rlps: Rlp,
out_chunk: &mut [(H256, Bytes)],
known_code: &HashMap<H256, H256>,
@ -512,7 +530,7 @@ fn rebuild_accounts(
// fill out the storage trie and code while decoding.
let (acc, maybe_code) = {
let mut acct_db = AccountDBMut::from_hash(db, hash);
let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or(H256::zero());
let storage_root = known_storage_roots.get(&hash).cloned().unwrap_or_default();
account::from_fat_rlp(&mut acct_db, fat_rlp, storage_root)?
};
@ -560,7 +578,7 @@ const POW_VERIFY_RATE: f32 = 0.02;
/// Verify an old block with the given header, engine, blockchain, body. If `always` is set, it will perform
/// the fullest verification possible. If not, it will take a random sample to determine whether it will
/// do heavy or light verification.
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> {
pub fn verify_old_block(rng: &mut OsRng, header: &Header, engine: &dyn EthEngine, chain: &BlockChain, always: bool) -> Result<(), ::error::Error> {
engine.verify_block_basic(header)?;
if always || rng.gen::<f32>() <= POW_VERIFY_RATE {

View File

@ -415,7 +415,7 @@ impl Service {
_ => break,
}
// Writting changes to DB and logging every now and then
// Writing changes to DB and logging every now and then
if block_number % 1_000 == 0 {
next_db.key_value().write_buffered(batch);
next_chain.commit();
@ -479,16 +479,12 @@ impl Service {
let guard = Guard::new(temp_dir.clone());
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
self.taking_snapshot.store(false, Ordering::SeqCst);
if let Err(e) = res {
if client.chain_info().best_block_number >= num + client.pruning_history() {
// "Cancelled" is mincing words a bit -- what really happened
// is that the state we were snapshotting got pruned out
// before we could finish.
info!("Periodic snapshot failed: block state pruned.\
Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
return Ok(())
// The state we were snapshotting was pruned before we could finish.
info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
return Err(e);
} else {
return Err(e);
}
@ -846,14 +842,29 @@ impl SnapshotService for Service {
}
}
fn abort_snapshot(&self) {
if self.taking_snapshot.load(Ordering::SeqCst) {
trace!(target: "snapshot", "Aborting snapshot Snapshot under way");
self.progress.abort.store(true, Ordering::SeqCst);
}
}
fn shutdown(&self) {
trace!(target: "snapshot", "Shut down SnapshotService");
self.abort_restore();
trace!(target: "snapshot", "Shut down SnapshotService - restore aborted");
self.abort_snapshot();
trace!(target: "snapshot", "Shut down SnapshotService - snapshot aborted");
}
}
impl Drop for Service {
fn drop(&mut self) {
trace!(target: "shutdown", "Dropping Service");
self.abort_restore();
trace!(target: "shutdown", "Dropping Service - restore aborted");
self.abort_snapshot();
trace!(target: "shutdown", "Dropping Service - snapshot aborted");
}
}

View File

@ -188,14 +188,15 @@ fn keep_ancient_blocks() {
&state_root,
&writer,
&Progress::default(),
None
None,
0
).unwrap();
let manifest = ::snapshot::ManifestData {
version: 2,
state_hashes: state_hashes,
state_root: state_root,
block_hashes: block_hashes,
state_hashes,
state_root,
block_hashes,
block_number: NUM_BLOCKS,
block_hash: best_hash,
};

View File

@ -55,7 +55,7 @@ fn snap_and_restore() {
let mut state_hashes = Vec::new();
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap();
state_hashes.append(&mut hashes);
}
@ -126,8 +126,8 @@ fn get_code_from_prev_chunk() {
let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(&code[..]);
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value()).unwrap();
let p = Progress::default();
let fat_rlp = account::to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1);
stream.out()
@ -171,13 +171,13 @@ fn checks_flag() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap();
writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
state_hashes: state_hashes,
state_hashes,
block_hashes: Vec::new(),
state_root: state_root,
state_root,
block_number: 0,
block_hash: H256::default(),
}).unwrap();

View File

@ -55,6 +55,9 @@ pub trait SnapshotService : Sync + Send {
/// no-op if currently restoring.
fn restore_block_chunk(&self, hash: H256, chunk: Bytes);
/// Abort in-progress snapshotting if there is one.
fn abort_snapshot(&self);
/// Shutdown the Snapshot Service by aborting any ongoing restore
fn shutdown(&self);
}

View File

@ -28,7 +28,7 @@ use types::view;
use types::views::BlockView;
use block::IsBlock;
use client::{BlockChainClient, Client, ClientConfig, BlockId, ChainInfo, BlockInfo, PrepareOpenBlock, ImportSealedBlock, ImportBlock};
use client::{BlockChainClient, BlockChainReset, Client, ClientConfig, BlockId, ChainInfo, BlockInfo, PrepareOpenBlock, ImportSealedBlock, ImportBlock};
use ethereum;
use executive::{Executive, TransactOptions};
use miner::{Miner, PendingOrdering, MinerService};
@ -367,3 +367,23 @@ fn transaction_proof() {
assert_eq!(state.balance(&Address::default()).unwrap(), 5.into());
assert_eq!(state.balance(&address).unwrap(), 95.into());
}
#[test]
fn reset_blockchain() {
let client = get_test_client_with_blocks(get_good_dummy_block_seq(19));
// 19 + genesis block
assert!(client.block_header(BlockId::Number(20)).is_some());
assert_eq!(client.block_header(BlockId::Number(20)).unwrap().hash(), client.best_block_header().hash());
assert!(client.reset(5).is_ok());
client.chain().clear_cache();
assert!(client.block_header(BlockId::Number(20)).is_none());
assert!(client.block_header(BlockId::Number(19)).is_none());
assert!(client.block_header(BlockId::Number(18)).is_none());
assert!(client.block_header(BlockId::Number(17)).is_none());
assert!(client.block_header(BlockId::Number(16)).is_none());
assert!(client.block_header(BlockId::Number(15)).is_some());
}

View File

@ -40,7 +40,6 @@ use types::{BlockNumber, header::Header};
use types::transaction::SignedTransaction;
use verification::queue::kind::blocks::Unverified;
#[cfg(not(time_checked_add))]
use time_utils::CheckedSystemTime;
/// Preprocessed block data gathered in `verify_block_unordered` call
@ -310,7 +309,7 @@ pub fn verify_header_params(header: &Header, engine: &EthEngine, is_full: bool,
// this will resist overflow until `year 2037`
let max_time = SystemTime::now() + ACCEPTABLE_DRIFT;
let invalid_threshold = max_time + ACCEPTABLE_DRIFT * 9;
let timestamp = UNIX_EPOCH.checked_add(Duration::from_secs(header.timestamp()))
let timestamp = CheckedSystemTime::checked_add(UNIX_EPOCH, Duration::from_secs(header.timestamp()))
.ok_or(BlockError::TimestampOverflow)?;
if timestamp > invalid_threshold {
@ -334,9 +333,9 @@ fn verify_parent(header: &Header, parent: &Header, engine: &EthEngine) -> Result
if !engine.is_timestamp_valid(header.timestamp(), parent.timestamp()) {
let now = SystemTime::now();
let min = now.checked_add(Duration::from_secs(parent.timestamp().saturating_add(1)))
let min = CheckedSystemTime::checked_add(now, Duration::from_secs(parent.timestamp().saturating_add(1)))
.ok_or(BlockError::TimestampOverflow)?;
let found = now.checked_add(Duration::from_secs(header.timestamp()))
let found = CheckedSystemTime::checked_add(now, Duration::from_secs(header.timestamp()))
.ok_or(BlockError::TimestampOverflow)?;
return Err(From::from(BlockError::InvalidTimestamp(OutOfBounds { max: None, min: Some(min), found })))
}

View File

@ -122,6 +122,8 @@ impl SnapshotService for TestSnapshotService {
self.block_restoration_chunks.lock().clear();
}
fn abort_snapshot(&self) {}
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
if self.restoration_manifest.lock().as_ref().map_or(false, |m| m.state_hashes.iter().any(|h| h == &hash)) {
self.state_restoration_chunks.lock().insert(hash, chunk);

View File

@ -932,7 +932,7 @@ impl Configuration {
no_periodic: self.args.flag_no_periodic_snapshot,
processing_threads: match self.args.arg_snapshot_threads {
Some(threads) if threads > 0 => threads,
_ => ::std::cmp::max(1, num_cpus::get() / 2),
_ => ::std::cmp::max(1, num_cpus::get_physical() / 2),
},
};

View File

@ -15,7 +15,6 @@
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! Ethcore client application.
#![warn(missing_docs)]
extern crate ansi_term;

View File

@ -252,6 +252,7 @@ pub struct FullDependencies {
pub gas_price_percentile: usize,
pub poll_lifetime: u32,
pub allow_missing_blocks: bool,
pub no_ancient_blocks: bool,
}
impl FullDependencies {
@ -303,6 +304,7 @@ impl FullDependencies {
gas_price_percentile: self.gas_price_percentile,
allow_missing_blocks: self.allow_missing_blocks,
allow_experimental_rpcs: self.experimental_rpcs,
no_ancient_blocks: self.no_ancient_blocks
}
);
handler.extend_with(client.to_delegate());

View File

@ -753,6 +753,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
gas_price_percentile: cmd.gas_price_percentile,
poll_lifetime: cmd.poll_lifetime,
allow_missing_blocks: cmd.allow_missing_blocks,
no_ancient_blocks: !cmd.download_old_blocks,
});
let dependencies = rpc::Dependencies {
@ -903,17 +904,27 @@ impl RunningClient {
// Create a weak reference to the client so that we can wait on shutdown
// until it is dropped
let weak_client = Arc::downgrade(&client);
// Shutdown and drop the ServiceClient
// Shutdown and drop the ClientService
client_service.shutdown();
trace!(target: "shutdown", "ClientService shut down");
drop(client_service);
trace!(target: "shutdown", "ClientService dropped");
// drop this stuff as soon as exit detected.
drop(rpc);
trace!(target: "shutdown", "RPC dropped");
drop(keep_alive);
trace!(target: "shutdown", "KeepAlive dropped");
// to make sure timer does not spawn requests while shutdown is in progress
informant.shutdown();
trace!(target: "shutdown", "Informant shut down");
// just Arc is dropping here, to allow other reference release in its default time
drop(informant);
trace!(target: "shutdown", "Informant dropped");
drop(client);
trace!(target: "shutdown", "Client dropped");
// This may help when debugging ref cycles. Requires nightly-only `#![feature(weak_counts)]`
// trace!(target: "shutdown", "Waiting for refs to Client to shutdown, strong_count={:?}, weak_count={:?}", weak_client.strong_count(), weak_client.weak_count());
trace!(target: "shutdown", "Waiting for refs to Client to shutdown");
wait_for_drop(weak_client);
}
}
@ -947,24 +958,30 @@ fn print_running_environment(data_dir: &str, dirs: &Directories, db_dirs: &Datab
}
fn wait_for_drop<T>(w: Weak<T>) {
let sleep_duration = Duration::from_secs(1);
let warn_timeout = Duration::from_secs(60);
let max_timeout = Duration::from_secs(300);
const SLEEP_DURATION: Duration = Duration::from_secs(1);
const WARN_TIMEOUT: Duration = Duration::from_secs(60);
const MAX_TIMEOUT: Duration = Duration::from_secs(300);
let instant = Instant::now();
let mut warned = false;
while instant.elapsed() < max_timeout {
while instant.elapsed() < MAX_TIMEOUT {
if w.upgrade().is_none() {
return;
}
if !warned && instant.elapsed() > warn_timeout {
if !warned && instant.elapsed() > WARN_TIMEOUT {
warned = true;
warn!("Shutdown is taking longer than expected.");
}
thread::sleep(sleep_duration);
thread::sleep(SLEEP_DURATION);
// When debugging shutdown issues on a nightly build it can help to enable this with the
// `#![feature(weak_counts)]` added to lib.rs (TODO: enable when
// https://github.com/rust-lang/rust/issues/57977 is stable)
// trace!(target: "shutdown", "Waiting for client to drop, strong_count={:?}, weak_count={:?}", w.strong_count(), w.weak_count());
trace!(target: "shutdown", "Waiting for client to drop");
}
warn!("Shutdown timeout reached, exiting uncleanly.");

View File

@ -261,7 +261,7 @@ impl SnapshotCommand {
let cur_size = p.size();
if cur_size != last_size {
last_size = cur_size;
let bytes = ::informant::format_bytes(p.size());
let bytes = ::informant::format_bytes(cur_size as usize);
info!("Snapshot: {} accounts {} blocks {}", p.accounts(), p.blocks(), bytes);
}

View File

@ -29,6 +29,7 @@ use light::on_demand::error::{Error as OnDemandError, ErrorKind as OnDemandError
use ethcore::client::BlockChainClient;
use types::blockchain_info::BlockChainInfo;
use v1::types::BlockNumber;
use v1::impls::EthClientOptions;
mod codes {
// NOTE [ToDr] Codes from [-32099, -32000]
@ -221,18 +222,34 @@ pub fn cannot_submit_work(err: EthcoreError) -> Error {
}
}
pub fn unavailable_block() -> Error {
Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST),
message: "Ancient block sync is still in progress".into(),
data: None,
pub fn unavailable_block(no_ancient_block: bool, by_hash: bool) -> Error {
if no_ancient_block {
Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST),
message: "Looks like you disabled ancient block download, unfortunately the information you're \
trying to fetch doesn't exist in the db and is probably in the ancient blocks.".into(),
data: None,
}
} else if by_hash {
Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST),
message: "Block information is incomplete while ancient block sync is still in progress, before \
it's finished we can't determine the existence of requested item.".into(),
data: None,
}
} else {
Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST),
message: "Requested block number is in a range that is not available yet, because the ancient block sync is still in progress.".into(),
data: None,
}
}
}
pub fn check_block_number_existence<'a, T, C>(
client: &'a C,
num: BlockNumber,
allow_missing_blocks: bool,
options: EthClientOptions,
) ->
impl Fn(Option<T>) -> RpcResult<Option<T>> + 'a
where C: BlockChainClient,
@ -242,8 +259,8 @@ pub fn check_block_number_existence<'a, T, C>(
if let BlockNumber::Num(block_number) = num {
// tried to fetch block number and got nothing even though the block number is
// less than the latest block number
if block_number < client.chain_info().best_block_number && !allow_missing_blocks {
return Err(unavailable_block());
if block_number < client.chain_info().best_block_number && !options.allow_missing_blocks {
return Err(unavailable_block(options.no_ancient_blocks, false));
}
}
}
@ -253,22 +270,17 @@ pub fn check_block_number_existence<'a, T, C>(
pub fn check_block_gap<'a, T, C>(
client: &'a C,
allow_missing_blocks: bool,
options: EthClientOptions,
) -> impl Fn(Option<T>) -> RpcResult<Option<T>> + 'a
where C: BlockChainClient,
{
move |response| {
if response.is_none() && !allow_missing_blocks {
if response.is_none() && !options.allow_missing_blocks {
let BlockChainInfo { ancient_block_hash, .. } = client.chain_info();
// block information was requested, but unfortunately we couldn't find it and there
// are gaps in the database ethcore/src/blockchain/blockchain.rs
if ancient_block_hash.is_some() {
return Err(Error {
code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST),
message: "Block information is incomplete while ancient block sync is still in progress, before \
it's finished we can't determine the existence of requested item.".into(),
data: None,
})
return Err(unavailable_block(options.no_ancient_blocks, true))
}
}
Ok(response)

View File

@ -54,6 +54,7 @@ use v1::metadata::Metadata;
const EXTRA_INFO_PROOF: &str = "Object exists in blockchain (fetched earlier), extra_info is always available if object exists; qed";
/// Eth RPC options
#[derive(Copy, Clone)]
pub struct EthClientOptions {
/// Return nonce from transaction queue when pending block not available.
pub pending_nonce_from_queue: bool,
@ -68,6 +69,8 @@ pub struct EthClientOptions {
pub allow_missing_blocks: bool,
/// Enable Experimental RPC-Calls
pub allow_experimental_rpcs: bool,
/// flag for ancient block sync
pub no_ancient_blocks: bool,
}
impl EthClientOptions {
@ -89,6 +92,7 @@ impl Default for EthClientOptions {
gas_price_percentile: 50,
allow_missing_blocks: false,
allow_experimental_rpcs: false,
no_ancient_blocks: false,
}
}
}
@ -674,7 +678,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
let trx_count = self.client.block(BlockId::Hash(hash.into()))
.map(|block| block.transactions_count().into());
let result = Ok(trx_count)
.and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
.and_then(errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
@ -689,7 +693,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
.and_then(errors::check_block_number_existence(
&*self.client,
num,
self.options.allow_missing_blocks
self.options
))
}
}))
@ -699,7 +703,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
let uncle_count = self.client.block(BlockId::Hash(hash.into()))
.map(|block| block.uncles_count().into());
let result = Ok(uncle_count)
.and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
.and_then(errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
@ -713,7 +717,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
.and_then(errors::check_block_number_existence(
&*self.client,
num,
self.options.allow_missing_blocks
self.options
))
}
}))
@ -734,14 +738,14 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
}
fn block_by_hash(&self, hash: H256, include_txs: bool) -> BoxFuture<Option<RichBlock>> {
let result = self.rich_block(BlockId::Hash(hash.into()).into(), include_txs)
.and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
let result = self.rich_block(BlockId::Hash(hash).into(), include_txs)
.and_then(errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
fn block_by_number(&self, num: BlockNumber, include_txs: bool) -> BoxFuture<Option<RichBlock>> {
let result = self.rich_block(num.clone().into(), include_txs).and_then(
errors::check_block_number_existence(&*self.client, num, self.options.allow_missing_blocks));
errors::check_block_number_existence(&*self.client, num, self.options));
Box::new(future::done(result))
}
@ -751,14 +755,14 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
.map(|t| Transaction::from_pending(t.pending().clone()))
});
let result = Ok(tx).and_then(
errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
fn transaction_by_block_hash_and_index(&self, hash: H256, index: Index) -> BoxFuture<Option<Transaction>> {
let id = PendingTransactionId::Location(PendingOrBlock::Block(BlockId::Hash(hash.into())), index.value());
let result = self.transaction(id).and_then(
errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
@ -772,7 +776,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
let transaction_id = PendingTransactionId::Location(block_id, index.value());
let result = self.transaction(transaction_id).and_then(
errors::check_block_number_existence(&*self.client, num, self.options.allow_missing_blocks));
errors::check_block_number_existence(&*self.client, num, self.options));
Box::new(future::done(result))
}
@ -786,7 +790,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
let receipt = self.client.transaction_receipt(TransactionId::Hash(hash));
let result = Ok(receipt.map(Into::into))
.and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
.and_then(errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
@ -794,7 +798,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
let result = self.uncle(PendingUncleId {
id: PendingOrBlock::Block(BlockId::Hash(hash.into())),
position: index.value()
}).and_then(errors::check_block_gap(&*self.client, self.options.allow_missing_blocks));
}).and_then(errors::check_block_gap(&*self.client, self.options));
Box::new(future::done(result))
}
@ -811,7 +815,7 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
.and_then(errors::check_block_number_existence(
&*self.client,
num,
self.options.allow_missing_blocks
self.options
));
Box::new(future::done(result))

View File

@ -143,7 +143,8 @@ impl EthTester {
send_block_number_in_get_work: true,
gas_price_percentile: 50,
allow_experimental_rpcs: true,
allow_missing_blocks: false
allow_missing_blocks: false,
no_ancient_blocks: false
},
);

View File

@ -48,6 +48,7 @@ impl SnapshotService for TestSnapshotService {
fn status(&self) -> RestorationStatus { self.status.lock().clone() }
fn begin_restore(&self, _manifest: ManifestData) { }
fn abort_restore(&self) { }
fn abort_snapshot(&self) {}
fn restore_state_chunk(&self, _hash: H256, _chunk: Bytes) { }
fn restore_block_chunk(&self, _hash: H256, _chunk: Bytes) { }
fn shutdown(&self) { }

View File

@ -3,7 +3,9 @@
set -e # fail on any error
VERSION=$(cat ./tools/VERSION)
TRACK=$(cat ./tools/TRACK)
echo "Parity Ethereum version = ${VERSION}"
echo "Parity Ethereum track = ${TRACK}"
test "$Docker_Hub_User_Parity" -a "$Docker_Hub_Pass_Parity" \
|| ( echo "no docker credentials provided"; exit 1 )
@ -44,6 +46,14 @@ case "${SCHEDULE_TAG:-${CI_COMMIT_REF_NAME}}" in
--file tools/Dockerfile .;
docker push "parity/parity:${VERSION}-${CI_COMMIT_REF_NAME}";
docker push "parity/parity:stable";;
v[0-9]*.[0-9]*)
echo "Docker TAG - 'parity/parity:${VERSION}-${TRACK}'"
docker build --no-cache \
--build-arg VCS_REF="${CI_COMMIT_SHA}" \
--build-arg BUILD_DATE="$(date -u '+%Y-%m-%dT%H:%M:%SZ')" \
--tag "parity/parity:${VERSION}-${TRACK}" \
--file tools/Dockerfile .;
docker push "parity/parity:${VERSION}-${TRACK}";;
*)
echo "Docker TAG - 'parity/parity:${VERSION}-${CI_COMMIT_REF_NAME}'"
docker build --no-cache \

View File

@ -168,7 +168,6 @@ pub struct Discovery<'a> {
discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>,
// Sometimes we don't want to add nodes to the NodeTable, but still want to
// keep track of them to avoid excessive pinging (happens when an unknown node sends
// a discovery request to us -- the node might be on a different net).
@ -257,7 +256,7 @@ impl<'a> Discovery<'a> {
Ok(()) => None,
Err(BucketError::Ourselves) => None,
Err(BucketError::NotInTheBucket{node_entry, bucket_distance}) => Some((node_entry, bucket_distance))
}.map(|(node_entry, bucket_distance)| {
}.and_then(|(node_entry, bucket_distance)| {
trace!(target: "discovery", "Adding a new node {:?} into our bucket {}", &node_entry, bucket_distance);
let mut added = HashMap::with_capacity(1);
@ -265,7 +264,7 @@ impl<'a> Discovery<'a> {
let node_to_ping = {
let bucket = &mut self.node_buckets[bucket_distance];
bucket.nodes.push_front(BucketEntry::new(node_entry));
bucket.nodes.push_front(BucketEntry::new(node_entry.clone()));
if bucket.nodes.len() > BUCKET_SIZE {
select_bucket_ping(bucket.nodes.iter())
} else {
@ -275,7 +274,12 @@ impl<'a> Discovery<'a> {
if let Some(node) = node_to_ping {
self.try_ping(node, PingReason::Default);
};
TableUpdates{added, removed: HashSet::new()}
if node_entry.endpoint.is_valid_sync_node() {
Some(TableUpdates { added, removed: HashSet::new() })
} else {
None
}
})
}
@ -518,7 +522,18 @@ impl<'a> Discovery<'a> {
fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from);
let ping_from = NodeEndpoint::from_rlp(&rlp.at(1)?)?;
let ping_from = if let Ok(node_endpoint) = NodeEndpoint::from_rlp(&rlp.at(1)?) {
node_endpoint
} else {
let mut address = from.clone();
// address here is the node's tcp port. If we are unable to get the `NodeEndpoint` from the `ping_from`
// rlp field then this is most likely a BootNode, set the tcp port to 0 because it can not be used for syncing.
address.set_port(0);
NodeEndpoint {
address,
udp_port: from.port()
}
};
let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
let timestamp: u64 = rlp.val_at(3)?;
self.check_timestamp(timestamp)?;
@ -540,7 +555,7 @@ impl<'a> Discovery<'a> {
self.send_packet(PACKET_PONG, from, &response.drain())?;
let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() };
if !entry.endpoint.is_valid() {
if !entry.endpoint.is_valid_discovery_node() {
debug!(target: "discovery", "Got bad address: {:?}", entry);
} else if !self.is_allowed(&entry) {
debug!(target: "discovery", "Address not allowed: {:?}", entry);
@ -728,7 +743,7 @@ impl<'a> Discovery<'a> {
trace!(target: "discovery", "Got {} Neighbours from {:?}", results_count, &from);
for r in rlp.at(0)?.iter() {
let endpoint = NodeEndpoint::from_rlp(&r)?;
if !endpoint.is_valid() {
if !endpoint.is_valid_discovery_node() {
debug!(target: "discovery", "Bad address: {:?}", endpoint);
continue;
}

View File

@ -103,10 +103,16 @@ impl NodeEndpoint {
self.to_rlp(rlp);
}
/// Validates that the port is not 0 and address IP is specified
pub fn is_valid(&self) -> bool {
self.udp_port != 0 && self.address.port() != 0 &&
match self.address {
/// Validates that the tcp port is not 0 and that the node is a valid discovery node (i.e. `is_valid_discovery_node()` is true).
/// Sync happens over tcp.
pub fn is_valid_sync_node(&self) -> bool {
self.is_valid_discovery_node() && self.address.port() != 0
}
/// Validates that the udp port is not 0 and address IP is specified.
/// Peer discovery happens over udp.
pub fn is_valid_discovery_node(&self) -> bool {
self.udp_port != 0 && match self.address {
SocketAddr::V4(a) => !a.ip().is_unspecified(),
SocketAddr::V6(a) => !a.ip().is_unspecified()
}

View File

@ -3,7 +3,7 @@
[package]
name = "parity-version"
# NOTE: this value is used for Parity Ethereum version string (via env CARGO_PKG_VERSION)
version = "2.4.7"
version = "2.4.8"
authors = ["Parity Technologies <admin@parity.io>"]
build = "build.rs"