Backports for 2.1.0 beta (#9518)

* parity-version: mark 2.1.0 track beta

* ci: update branch version references

* docker: release master to latest

* Fix checkpointing when creating contract failed (#9514)

* ci: fix json docs generation (#9515)

* fix typo in version string (#9516)

* Update patricia trie to 0.2.2 crates. Default dependencies on minor
version only.

* Putting back ethereum tests to the right commit

* Enable all Constantinople hard fork changes in constantinople_test.json (#9505)

* Enable all Constantinople hard fork changes in constantinople_test.json

* Address grumbles

* Remove EIP-210 activation

* 8m -> 5m

* Temporarily add back eip210 transition so we can get test passed

* Add eip210_test and remove eip210 transition from const_test

* In create memory calculation is the same for create2 because the additional parameter was popped before. (#9522)

* deps: bump fs-swap and kvdb-rocksdb

* Multithreaded snapshot creation (#9239)

* Add Progress to Snapshot Secondary chunks creation

* Use half of CPUs to multithread snapshot creation

* Use env var to define number of threads

* info to debug logs

* Add Snapshot threads as CLI option

* Randomize chunks per thread

* Remove randomness, add debugging

* Add warning

* Add tracing

* Use parity-common fix seek branch

* Fix log

* Fix tests

* Fix tests

* PR Grumbles

* PR Grumble II

* Update Cargo.lock

* PR Grumbles

* Default snapshot threads to half number of CPUs

* Fix default snapshot threads // min 1

* correct before_script for nightly build versions (#9543)

- fix gitlab array of strings syntax error
- get proper commit id
- avoid colon in stings

* Remove initial token for WS. (#9545)

* version: mark release critical

* ci: fix rpc docs generation 2 (#9550)

* Improve P2P discovery (#9526)

* Add `target` to Rust traces

* network-devp2p: Don't remove discovery peer in main sync

* network-p2p: Refresh discovery more often

* Update Peer discovery protocol

* Run discovery more often when not enough nodes connected

* Start the first discovery early

* Update fast discovery rate

* Fix tests

* Fix `ping` tests

* Fixing remote Node address ; adding PingPong round

* Fix tests: update new +1 PingPong round

* Increase slow Discovery rate
Check in flight FindNode before pings

* Add `deprecated` to deprecated_echo_hash

* Refactor `discovery_round` branching

* net_version caches network_id to avoid redundant aquire of sync read lock (#9544)

* net_version caches network_id to avoid redundant aquire of sync read lock, #8746

* use lower_hex display formatting for net_peerCount rpc method

* Increase Gas-floor-target and Gas Cap (#9564)

+ Gas-floor-target increased to 8M by default

+ Gas-cap increased to 10M by default

* Revert to old parity-tokio-ipc.

* Downgrade named pipes.
This commit is contained in:
Afri Schoedon 2018-09-17 19:22:30 +02:00 committed by GitHub
parent 631df0fe56
commit d147700046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 834 additions and 598 deletions

View File

@ -41,17 +41,14 @@ cache:
paths: paths:
- artifacts/ - artifacts/
.determine_version: .determine_version: &determine_version
before_script: &determine_version - VERSION="$(sed -r -n '1,/^version/s/^version = "([^"]+)".*$/\1/p' Cargo.toml)"
- > - DATE_STR="$(date +%Y%m%d)"
VERSION="$(sed -r -n '1,/^version/s/^version = "([^"]+)".*$/\1/p' < Cargo.toml)"; - ID_SHORT="$(echo ${CI_COMMIT_SHA} | cut -c 1-7)"
if [ "${CI_COMMIT_REF_NAME}" = "nightly" ]; then - test "${CI_COMMIT_REF_NAME}" = "nightly" && VERSION="${VERSION}-${ID_SHORT}-${DATE_STR}"
COMMIT_REF_SHORT="echo ${CI_COMMIT_REF} | grep -oE '^.{7}')"; - export VERSION
DATE_STRING="$(date +%Y%m%d)"; - echo "Version = ${VERSION}"
export VERSION="${VERSION}-${COMMIT_REF_SHORT}-${DATE_STRING}";
fi;
export VERSION;
echo "Version: $VERSION"
#### stage: test #### stage: test
@ -256,7 +253,7 @@ publish-linux-snap-armhf:
publish-docker-parity-amd64: &publish_docker publish-docker-parity-amd64: &publish_docker
stage: publish stage: publish
only: *publishable_branches only: *releaseable_branches
cache: {} cache: {}
dependencies: dependencies:
- build-linux-ubuntu-amd64 - build-linux-ubuntu-amd64

644
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -18,7 +18,7 @@ ethcore-bloom-journal = { path = "../util/bloom" }
parity-bytes = "0.1" parity-bytes = "0.1"
hashdb = "0.2.1" hashdb = "0.2.1"
memorydb = "0.2.1" memorydb = "0.2.1"
patricia-trie = "0.2.1" patricia-trie = "0.2"
patricia-trie-ethereum = { path = "../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../util/patricia-trie-ethereum" }
parity-crypto = "0.1" parity-crypto = "0.1"
error-chain = { version = "0.12", default-features = false } error-chain = { version = "0.12", default-features = false }

View File

@ -233,11 +233,7 @@ impl<Gas: evm::CostType> Gasometer<Gas> {
}, },
instructions::CREATE | instructions::CREATE2 => { instructions::CREATE | instructions::CREATE2 => {
let gas = Gas::from(schedule.create_gas); let gas = Gas::from(schedule.create_gas);
let mem = match instruction { let mem = mem_needed(stack.peek(1), stack.peek(2))?;
instructions::CREATE => mem_needed(stack.peek(1), stack.peek(2))?,
instructions::CREATE2 => mem_needed(stack.peek(2), stack.peek(3))?,
_ => unreachable!("instruction can only be CREATE/CREATE2 checked above; qed"),
};
Request::GasMemProvide(gas, mem, None) Request::GasMemProvide(gas, mem, None)
}, },

View File

@ -13,7 +13,7 @@ parity-bytes = "0.1"
ethcore-transaction = { path = "../transaction" } ethcore-transaction = { path = "../transaction" }
ethereum-types = "0.4" ethereum-types = "0.4"
memorydb = "0.2.1" memorydb = "0.2.1"
patricia-trie = "0.2.1" patricia-trie = "0.2"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
ethcore-network = { path = "../../util/network" } ethcore-network = { path = "../../util/network" }
ethcore-io = { path = "../../util/io" } ethcore-io = { path = "../../util/io" }

View File

@ -26,7 +26,7 @@ heapsize = "0.4"
keccak-hash = "0.1.2" keccak-hash = "0.1.2"
log = "0.4" log = "0.4"
parking_lot = "0.6" parking_lot = "0.6"
patricia-trie = "0.2.1" patricia-trie = "0.2"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
rand = "0.3" rand = "0.3"
rlp = { version = "0.2.4", features = ["ethereum"] } rlp = { version = "0.2.4", features = ["ethereum"] }

View File

@ -1,16 +1,16 @@
{ {
"name": "Byzantium (Test)", "name": "Constantinople (test)",
"engine": { "engine": {
"Ethash": { "Ethash": {
"params": { "params": {
"minimumDifficulty": "0x020000", "minimumDifficulty": "0x020000",
"difficultyBoundDivisor": "0x0800", "difficultyBoundDivisor": "0x0800",
"durationLimit": "0x0d", "durationLimit": "0x0d",
"blockReward": "0x29A2241AF62C0000", "blockReward": "0x1BC16D674EC80000",
"homesteadTransition": "0x0", "homesteadTransition": "0x0",
"eip100bTransition": "0x0", "eip100bTransition": "0x0",
"difficultyBombDelays": { "difficultyBombDelays": {
"0": 3000000 "0": 5000000
} }
} }
} }
@ -30,11 +30,13 @@
"eip161abcTransition": "0x0", "eip161abcTransition": "0x0",
"eip161dTransition": "0x0", "eip161dTransition": "0x0",
"eip140Transition": "0x0", "eip140Transition": "0x0",
"eip210Transition": "0x0",
"eip211Transition": "0x0", "eip211Transition": "0x0",
"eip214Transition": "0x0", "eip214Transition": "0x0",
"eip155Transition": "0x0", "eip155Transition": "0x0",
"eip658Transition": "0x0", "eip658Transition": "0x0",
"eip145Transition": "0x0",
"eip1014Transition": "0x0",
"eip1052Transition": "0x0",
"eip1283Transition": "0x0" "eip1283Transition": "0x0"
}, },
"genesis": { "genesis": {

View File

@ -0,0 +1,54 @@
{
"name": "EIP210 (test)",
"engine": {
"Ethash": {
"params": {
"minimumDifficulty": "0x020000",
"difficultyBoundDivisor": "0x0800",
"durationLimit": "0x0d",
"blockReward": "0x4563918244F40000",
"homesteadTransition": "0x0"
}
}
},
"params": {
"gasLimitBoundDivisor": "0x0400",
"registrar" : "0xc6d9d2cd449a754c494264e1809c50e34d64562b",
"accountStartNonce": "0x00",
"maximumExtraDataSize": "0x20",
"minGasLimit": "0x1388",
"networkID" : "0x1",
"maxCodeSize": 24576,
"maxCodeSizeTransition": "0x0",
"eip98Transition": "0xffffffffffffffff",
"eip150Transition": "0x0",
"eip160Transition": "0x0",
"eip161abcTransition": "0x0",
"eip161dTransition": "0x0",
"eip210Transition": "0x0"
},
"genesis": {
"seal": {
"ethereum": {
"nonce": "0x0000000000000042",
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000"
}
},
"difficulty": "0x400000000",
"author": "0x0000000000000000000000000000000000000000",
"timestamp": "0x00",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"extraData": "0x11bbe8db4e347b4e8c937c1c8370e4b5ed33adb3db69cbdb7a38e1e50b1b82fa",
"gasLimit": "0x1388"
},
"accounts": {
"0000000000000000000000000000000000000001": { "balance": "1", "builtin": { "name": "ecrecover", "pricing": { "linear": { "base": 3000, "word": 0 } } } },
"0000000000000000000000000000000000000002": { "balance": "1", "builtin": { "name": "sha256", "pricing": { "linear": { "base": 60, "word": 12 } } } },
"0000000000000000000000000000000000000003": { "balance": "1", "builtin": { "name": "ripemd160", "pricing": { "linear": { "base": 600, "word": 120 } } } },
"0000000000000000000000000000000000000004": { "balance": "1", "builtin": { "name": "identity", "pricing": { "linear": { "base": 15, "word": 3 } } } },
"0000000000000000000000000000000000000005": { "builtin": { "name": "modexp", "activate_at": "0x00", "pricing": { "modexp": { "divisor": 100 } } } },
"0000000000000000000000000000000000000006": { "builtin": { "name": "alt_bn128_add", "activate_at": "0x00", "pricing": { "linear": { "base": 500, "word": 0 } } } },
"0000000000000000000000000000000000000007": { "builtin": { "name": "alt_bn128_mul", "activate_at": "0x00", "pricing": { "linear": { "base": 2000, "word": 0 } } } },
"0000000000000000000000000000000000000008": { "builtin": { "name": "alt_bn128_pairing", "activate_at": "0x00", "pricing": { "alt_bn128_pairing": { "base": 100000, "pair": 80000 } } } }
}
}

View File

@ -1153,7 +1153,8 @@ impl Client {
}, },
}; };
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p)?; let processing_threads = self.config.snapshot.processing_threads;
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p, processing_threads)?;
Ok(()) Ok(())
} }

View File

@ -19,6 +19,7 @@ use std::fmt::{Display, Formatter, Error as FmtError};
use verification::{VerifierType, QueueConfig}; use verification::{VerifierType, QueueConfig};
use journaldb; use journaldb;
use snapshot::SnapshotConfiguration;
pub use std::time::Duration; pub use std::time::Duration;
pub use blockchain::Config as BlockChainConfig; pub use blockchain::Config as BlockChainConfig;
@ -120,6 +121,8 @@ pub struct ClientConfig {
pub check_seal: bool, pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread. /// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize, pub transaction_verification_queue_size: usize,
/// Snapshot configuration
pub snapshot: SnapshotConfiguration,
} }
impl Default for ClientConfig { impl Default for ClientConfig {
@ -144,6 +147,7 @@ impl Default for ClientConfig {
history_mem: 32 * mb, history_mem: 32 * mb,
check_seal: true, check_seal: true,
transaction_verification_queue_size: 8192, transaction_verification_queue_size: 8192,
snapshot: Default::default(),
} }
} }
} }

View File

@ -152,6 +152,9 @@ pub fn new_frontier_test_machine() -> EthereumMachine { load_machine(include_byt
/// Create a new Foundation Homestead-era chain spec as though it never changed from Frontier. /// Create a new Foundation Homestead-era chain spec as though it never changed from Frontier.
pub fn new_homestead_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/homestead_test.json")) } pub fn new_homestead_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/homestead_test.json")) }
/// Create a new Foundation Homestead-EIP210-era chain spec as though it never changed from Homestead/Frontier.
pub fn new_eip210_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/eip210_test.json")) }
/// Create a new Foundation Byzantium era spec. /// Create a new Foundation Byzantium era spec.
pub fn new_byzantium_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/byzantium_test.json")) } pub fn new_byzantium_test_machine() -> EthereumMachine { load_machine(include_bytes!("../../res/ethereum/byzantium_test.json")) }

View File

@ -30,7 +30,7 @@ use machine::EthereumMachine;
use ids::BlockId; use ids::BlockId;
use header::Header; use header::Header;
use receipt::Receipt; use receipt::Receipt;
use snapshot::{Error, ManifestData}; use snapshot::{Error, ManifestData, Progress};
use itertools::{Position, Itertools}; use itertools::{Position, Itertools};
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
@ -59,6 +59,7 @@ impl SnapshotComponents for PoaSnapshot {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
sink: &mut ChunkSink, sink: &mut ChunkSink,
_progress: &Progress,
preferred_size: usize, preferred_size: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let number = chain.block_number(&block_at) let number = chain.block_number(&block_at)

View File

@ -22,7 +22,7 @@ use std::sync::Arc;
use blockchain::{BlockChain, BlockChainDB}; use blockchain::{BlockChain, BlockChainDB};
use engines::EthEngine; use engines::EthEngine;
use snapshot::{Error, ManifestData}; use snapshot::{Error, ManifestData, Progress};
use ethereum_types::H256; use ethereum_types::H256;
@ -49,6 +49,7 @@ pub trait SnapshotComponents: Send {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
chunk_sink: &mut ChunkSink, chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize, preferred_size: usize,
) -> Result<(), Error>; ) -> Result<(), Error>;

View File

@ -28,7 +28,7 @@ use std::sync::Arc;
use blockchain::{BlockChain, BlockChainDB, BlockProvider}; use blockchain::{BlockChain, BlockChainDB, BlockProvider};
use engines::EthEngine; use engines::EthEngine;
use snapshot::{Error, ManifestData}; use snapshot::{Error, ManifestData, Progress};
use snapshot::block::AbridgedBlock; use snapshot::block::AbridgedBlock;
use ethereum_types::H256; use ethereum_types::H256;
use kvdb::KeyValueDB; use kvdb::KeyValueDB;
@ -65,6 +65,7 @@ impl SnapshotComponents for PowSnapshot {
chain: &BlockChain, chain: &BlockChain,
block_at: H256, block_at: H256,
chunk_sink: &mut ChunkSink, chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize, preferred_size: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
PowWorker { PowWorker {
@ -72,6 +73,7 @@ impl SnapshotComponents for PowSnapshot {
rlps: VecDeque::new(), rlps: VecDeque::new(),
current_hash: block_at, current_hash: block_at,
writer: chunk_sink, writer: chunk_sink,
progress: progress,
preferred_size: preferred_size, preferred_size: preferred_size,
}.chunk_all(self.blocks) }.chunk_all(self.blocks)
} }
@ -96,6 +98,7 @@ struct PowWorker<'a> {
rlps: VecDeque<Bytes>, rlps: VecDeque<Bytes>,
current_hash: H256, current_hash: H256,
writer: &'a mut ChunkSink<'a>, writer: &'a mut ChunkSink<'a>,
progress: &'a Progress,
preferred_size: usize, preferred_size: usize,
} }
@ -138,6 +141,7 @@ impl<'a> PowWorker<'a> {
last = self.current_hash; last = self.current_hash;
self.current_hash = block.header_view().parent_hash(); self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
} }
if loaded_size != 0 { if loaded_size != 0 {

View File

@ -20,6 +20,7 @@
//! https://wiki.parity.io/Warp-Sync-Snapshot-Format //! https://wiki.parity.io/Warp-Sync-Snapshot-Format
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::cmp;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY}; use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
@ -43,6 +44,7 @@ use trie::{Trie, TrieMut};
use ethtrie::{TrieDB, TrieDBMut}; use ethtrie::{TrieDB, TrieDBMut};
use rlp::{RlpStream, Rlp}; use rlp::{RlpStream, Rlp};
use bloom_journal::Bloom; use bloom_journal::Bloom;
use num_cpus;
use self::io::SnapshotWriter; use self::io::SnapshotWriter;
@ -88,6 +90,28 @@ const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5;
const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1; const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1;
// current state chunk version. // current state chunk version.
const STATE_CHUNK_VERSION: u64 = 2; const STATE_CHUNK_VERSION: u64 = 2;
/// number of snapshot subparts, must be a power of 2 in [1; 256]
const SNAPSHOT_SUBPARTS: usize = 16;
/// Maximum number of snapshot subparts (must be a multiple of `SNAPSHOT_SUBPARTS`)
const MAX_SNAPSHOT_SUBPARTS: usize = 256;
/// Configuration for the Snapshot service
#[derive(Debug, Clone, PartialEq)]
pub struct SnapshotConfiguration {
/// If `true`, no periodic snapshots will be created
pub no_periodic: bool,
/// Number of threads for creating snapshots
pub processing_threads: usize,
}
impl Default for SnapshotConfiguration {
fn default() -> Self {
SnapshotConfiguration {
no_periodic: false,
processing_threads: ::std::cmp::max(1, num_cpus::get() / 2),
}
}
}
/// A progress indicator for snapshots. /// A progress indicator for snapshots.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -130,7 +154,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
block_at: H256, block_at: H256,
state_db: &HashDB<KeccakHasher>, state_db: &HashDB<KeccakHasher>,
writer: W, writer: W,
p: &Progress p: &Progress,
processing_threads: usize,
) -> Result<(), Error> { ) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_at) let start_header = chain.block_header_data(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?; .ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
@ -142,17 +167,45 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
let writer = Mutex::new(writer); let writer = Mutex::new(writer);
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?; let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
let snapshot_version = chunker.current_version(); let snapshot_version = chunker.current_version();
let (state_hashes, block_hashes) = scope(|scope| { let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer; let writer = &writer;
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p)); let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
let state_res = chunk_state(state_db, &state_root, writer, p);
state_res.and_then(|state_hashes| { // The number of threads must be between 1 and SNAPSHOT_SUBPARTS
block_guard.join().map(|block_hashes| (state_hashes, block_hashes)) assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
}) let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);
let mut state_guards = Vec::with_capacity(num_threads as usize);
for thread_idx in 0..num_threads {
let state_guard = scope.spawn(move || -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
chunk_hashes.append(&mut hashes);
}
Ok(chunk_hashes)
});
state_guards.push(state_guard);
}
let block_hashes = block_guard.join()?;
let mut state_hashes = Vec::new();
for guard in state_guards {
let part_state_hashes = guard.join()?;
state_hashes.extend(part_state_hashes);
}
debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
Ok((state_hashes, block_hashes))
})?; })?;
info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len()); info!(target: "snapshot", "produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
let manifest_data = ManifestData { let manifest_data = ManifestData {
version: snapshot_version, version: snapshot_version,
@ -200,6 +253,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc
chain, chain,
start_hash, start_hash,
&mut chunk_sink, &mut chunk_sink,
progress,
PREFERRED_CHUNK_SIZE, PREFERRED_CHUNK_SIZE,
)?; )?;
} }
@ -263,10 +317,12 @@ impl<'a> StateChunker<'a> {
/// Walk the given state database starting from the given root, /// Walk the given state database starting from the given root,
/// creating chunks and writing them out. /// creating chunks and writing them out.
/// `part` is a number between 0 and 15, which describe which part of
/// the tree should be chunked.
/// ///
/// Returns a list of hashes of chunks created, or any error it may /// Returns a list of hashes of chunks created, or any error it may
/// have encountered. /// have encountered.
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> { pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
let account_trie = TrieDB::new(db, &root)?; let account_trie = TrieDB::new(db, &root)?;
let mut chunker = StateChunker { let mut chunker = StateChunker {
@ -281,11 +337,33 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<Sn
let mut used_code = HashSet::new(); let mut used_code = HashSet::new();
// account_key here is the address' hash. // account_key here is the address' hash.
for item in account_trie.iter()? { let mut account_iter = account_trie.iter()?;
let mut seek_to = None;
if let Some(part) = part {
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation.");
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS;
let mut seek_from = vec![0; 32];
seek_from[0] = (part * part_offset) as u8;
account_iter.seek(&seek_from)?;
// Set the upper-bound, except for the last part
if part < SNAPSHOT_SUBPARTS - 1 {
seek_to = Some(((part + 1) * part_offset) as u8)
}
}
for item in account_iter {
let (account_key, account_data) = item?; let (account_key, account_data) = item?;
let account = ::rlp::decode(&*account_data)?;
let account_key_hash = H256::from_slice(&account_key); let account_key_hash = H256::from_slice(&account_key);
if seek_to.map_or(false, |seek_to| account_key[0] >= seek_to) {
break;
}
let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash); let account_db = AccountDB::from_hash(db, account_key_hash);
let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?; let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;

View File

@ -22,7 +22,7 @@ use hash::{KECCAK_NULL_RLP, keccak};
use basic_account::BasicAccount; use basic_account::BasicAccount;
use snapshot::account; use snapshot::account;
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder}; use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder, SNAPSHOT_SUBPARTS};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter}; use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
use super::helpers::{compare_dbs, StateProducer}; use super::helpers::{compare_dbs, StateProducer};
@ -53,7 +53,11 @@ fn snap_and_restore() {
let state_root = producer.state_root(); let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap(); let mut state_hashes = Vec::new();
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
state_hashes.append(&mut hashes);
}
writer.into_inner().finish(::snapshot::ManifestData { writer.into_inner().finish(::snapshot::ManifestData {
version: 2, version: 2,
@ -164,7 +168,7 @@ fn checks_flag() {
let state_root = producer.state_root(); let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap()); let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap(); let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();
writer.into_inner().finish(::snapshot::ManifestData { writer.into_inner().finish(::snapshot::ManifestData {
version: 2, version: 2,

View File

@ -38,7 +38,7 @@ fn test_blockhash_eip210(factory: Factory) {
let test_blockhash_contract = "73fffffffffffffffffffffffffffffffffffffffe33141561007a57600143036020526000356101006020510755600061010060205107141561005057600035610100610100602051050761010001555b6000620100006020510714156100755760003561010062010000602051050761020001555b61014a565b4360003512151561009057600060405260206040f35b610100600035430312156100b357610100600035075460605260206060f3610149565b62010000600035430312156100d157600061010060003507146100d4565b60005b156100f6576101006101006000350507610100015460805260206080f3610148565b630100000060003543031215610116576000620100006000350714610119565b60005b1561013c57610100620100006000350507610200015460a052602060a0f3610147565b600060c052602060c0f35b5b5b5b5b"; let test_blockhash_contract = "73fffffffffffffffffffffffffffffffffffffffe33141561007a57600143036020526000356101006020510755600061010060205107141561005057600035610100610100602051050761010001555b6000620100006020510714156100755760003561010062010000602051050761020001555b61014a565b4360003512151561009057600060405260206040f35b610100600035430312156100b357610100600035075460605260206060f3610149565b62010000600035430312156100d157600061010060003507146100d4565b60005b156100f6576101006101006000350507610100015460805260206080f3610148565b630100000060003543031215610116576000620100006000350714610119565b60005b1561013c57610100620100006000350507610200015460a052602060a0f3610147565b600060c052602060c0f35b5b5b5b5b";
let blockhash_contract_code = Arc::new(test_blockhash_contract.from_hex().unwrap()); let blockhash_contract_code = Arc::new(test_blockhash_contract.from_hex().unwrap());
let blockhash_contract_code_hash = keccak(blockhash_contract_code.as_ref()); let blockhash_contract_code_hash = keccak(blockhash_contract_code.as_ref());
let machine = ::ethereum::new_constantinople_test_machine(); let machine = ::ethereum::new_eip210_test_machine();
let mut env_info = EnvInfo::default(); let mut env_info = EnvInfo::default();
// populate state with 256 last hashes // populate state with 256 last hashes

View File

@ -200,7 +200,7 @@ impl SyncPropagator {
let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE); let appended = packet.append_raw_checked(&transaction.drain(), 1, MAX_TRANSACTION_PACKET_SIZE);
if !appended { if !appended {
// Maximal packet size reached just proceed with sending // Maximal packet size reached just proceed with sending
debug!("Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len()); debug!(target: "sync", "Transaction packet size limit reached. Sending incomplete set of {}/{} transactions.", pushed, to_send.len());
to_send = to_send.into_iter().take(pushed).collect(); to_send = to_send.into_iter().take(pushed).collect();
break; break;
} }

View File

@ -7,7 +7,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
byteorder = "1.0" byteorder = "1.0"
parity-bytes = "0.1" parity-bytes = "0.1"
ethereum-types = "0.4" ethereum-types = "0.4"
patricia-trie = "0.2.1" patricia-trie = "0.2"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
log = "0.4" log = "0.4"
common-types = { path = "../types" } common-types = { path = "../types" }

View File

@ -704,11 +704,11 @@ usage! {
"--price-update-period=[T]", "--price-update-period=[T]",
"T will be allowed to pass between each gas price update. T may be daily, hourly, a number of seconds, or a time string of the form \"2 days\", \"30 minutes\" etc..", "T will be allowed to pass between each gas price update. T may be daily, hourly, a number of seconds, or a time string of the form \"2 days\", \"30 minutes\" etc..",
ARG arg_gas_floor_target: (String) = "4700000", or |c: &Config| c.mining.as_ref()?.gas_floor_target.clone(), ARG arg_gas_floor_target: (String) = "8000000", or |c: &Config| c.mining.as_ref()?.gas_floor_target.clone(),
"--gas-floor-target=[GAS]", "--gas-floor-target=[GAS]",
"Amount of gas per block to target when sealing a new block.", "Amount of gas per block to target when sealing a new block.",
ARG arg_gas_cap: (String) = "6283184", or |c: &Config| c.mining.as_ref()?.gas_cap.clone(), ARG arg_gas_cap: (String) = "10000000", or |c: &Config| c.mining.as_ref()?.gas_cap.clone(),
"--gas-cap=[GAS]", "--gas-cap=[GAS]",
"A cap on how large we will raise the gas limit per block due to transaction volume.", "A cap on how large we will raise the gas limit per block due to transaction volume.",
@ -865,6 +865,10 @@ usage! {
"--no-periodic-snapshot", "--no-periodic-snapshot",
"Disable automated snapshots which usually occur once every 10000 blocks.", "Disable automated snapshots which usually occur once every 10000 blocks.",
ARG arg_snapshot_threads: (Option<usize>) = None, or |c: &Config| c.snapshots.as_ref()?.processing_threads,
"--snapshot-threads=[NUM]",
"Enables multiple threads for snapshots creation.",
["Whisper Options"] ["Whisper Options"]
FLAG flag_whisper: (bool) = false, or |c: &Config| c.whisper.as_ref()?.enabled, FLAG flag_whisper: (bool) = false, or |c: &Config| c.whisper.as_ref()?.enabled,
"--whisper", "--whisper",
@ -1345,6 +1349,7 @@ struct Footprint {
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
struct Snapshots { struct Snapshots {
disable_periodic: Option<bool>, disable_periodic: Option<bool>,
processing_threads: Option<usize>,
} }
#[derive(Default, Debug, PartialEq, Deserialize)] #[derive(Default, Debug, PartialEq, Deserialize)]
@ -1712,7 +1717,7 @@ mod tests {
arg_reseal_max_period: 60000u64, arg_reseal_max_period: 60000u64,
flag_reseal_on_uncle: false, flag_reseal_on_uncle: false,
arg_work_queue_size: 20usize, arg_work_queue_size: 20usize,
arg_tx_gas_limit: Some("6283184".into()), arg_tx_gas_limit: Some("10000000".into()),
arg_tx_time_limit: Some(100u64), arg_tx_time_limit: Some(100u64),
arg_relay_set: "cheap".into(), arg_relay_set: "cheap".into(),
arg_min_gas_price: Some(0u64), arg_min_gas_price: Some(0u64),
@ -1721,8 +1726,8 @@ mod tests {
arg_poll_lifetime: 60u32, arg_poll_lifetime: 60u32,
arg_usd_per_eth: "auto".into(), arg_usd_per_eth: "auto".into(),
arg_price_update_period: "hourly".into(), arg_price_update_period: "hourly".into(),
arg_gas_floor_target: "4700000".into(), arg_gas_floor_target: "8000000".into(),
arg_gas_cap: "6283184".into(), arg_gas_cap: "10000000".into(),
arg_extra_data: Some("Parity".into()), arg_extra_data: Some("Parity".into()),
flag_tx_queue_no_unfamiliar_locals: false, flag_tx_queue_no_unfamiliar_locals: false,
flag_tx_queue_no_early_reject: false, flag_tx_queue_no_early_reject: false,
@ -1771,6 +1776,7 @@ mod tests {
arg_export_state_at: "latest".into(), arg_export_state_at: "latest".into(),
arg_snapshot_at: "latest".into(), arg_snapshot_at: "latest".into(),
flag_no_periodic_snapshot: false, flag_no_periodic_snapshot: false,
arg_snapshot_threads: None,
// -- Whisper options. // -- Whisper options.
flag_whisper: false, flag_whisper: false,
@ -2021,6 +2027,7 @@ mod tests {
}), }),
snapshots: Some(Snapshots { snapshots: Some(Snapshots {
disable_periodic: Some(true), disable_periodic: Some(true),
processing_threads: None,
}), }),
misc: Some(Misc { misc: Some(Misc {
logging: Some("own_tx=trace".into()), logging: Some("own_tx=trace".into()),

View File

@ -125,13 +125,13 @@ min_gas_price = 0
usd_per_tx = "0.0001" usd_per_tx = "0.0001"
usd_per_eth = "auto" usd_per_eth = "auto"
price_update_period = "hourly" price_update_period = "hourly"
gas_floor_target = "4700000" gas_floor_target = "8000000"
gas_cap = "6283184" gas_cap = "10000000"
tx_queue_size = 8192 tx_queue_size = 8192
tx_queue_strategy = "gas_factor" tx_queue_strategy = "gas_factor"
tx_queue_ban_count = 1 tx_queue_ban_count = 1
tx_queue_ban_time = 180 #s tx_queue_ban_time = 180 #s
tx_gas_limit = "6283184" tx_gas_limit = "10000000"
tx_time_limit = 100 #ms tx_time_limit = 100 #ms
tx_queue_no_unfamiliar_locals = false tx_queue_no_unfamiliar_locals = false
tx_queue_no_early_reject = false tx_queue_no_early_reject = false

View File

@ -30,8 +30,10 @@ use sync::{NetworkConfiguration, validate_node_url, self};
use ethcore::ethstore::ethkey::{Secret, Public}; use ethcore::ethstore::ethkey::{Secret, Public};
use ethcore::client::{VMType}; use ethcore::client::{VMType};
use ethcore::miner::{stratum, MinerOptions}; use ethcore::miner::{stratum, MinerOptions};
use ethcore::snapshot::SnapshotConfiguration;
use ethcore::verification::queue::VerifierSettings; use ethcore::verification::queue::VerifierSettings;
use miner::pool; use miner::pool;
use num_cpus;
use rpc::{IpcConfiguration, HttpConfiguration, WsConfiguration}; use rpc::{IpcConfiguration, HttpConfiguration, WsConfiguration};
use parity_rpc::NetworkSettings; use parity_rpc::NetworkSettings;
@ -125,6 +127,7 @@ impl Configuration {
let update_policy = self.update_policy()?; let update_policy = self.update_policy()?;
let logger_config = self.logger_config(); let logger_config = self.logger_config();
let ws_conf = self.ws_config()?; let ws_conf = self.ws_config()?;
let snapshot_conf = self.snapshot_config()?;
let http_conf = self.http_config()?; let http_conf = self.http_config()?;
let ipc_conf = self.ipc_config()?; let ipc_conf = self.ipc_config()?;
let net_conf = self.net_config()?; let net_conf = self.net_config()?;
@ -298,6 +301,7 @@ impl Configuration {
file_path: self.args.arg_snapshot_file.clone(), file_path: self.args.arg_snapshot_file.clone(),
kind: snapshot::Kind::Take, kind: snapshot::Kind::Take,
block_at: to_block_id(&self.args.arg_snapshot_at)?, block_at: to_block_id(&self.args.arg_snapshot_at)?,
snapshot_conf: snapshot_conf,
}; };
Cmd::Snapshot(snapshot_cmd) Cmd::Snapshot(snapshot_cmd)
} else if self.args.cmd_restore { } else if self.args.cmd_restore {
@ -314,6 +318,7 @@ impl Configuration {
file_path: self.args.arg_restore_file.clone(), file_path: self.args.arg_restore_file.clone(),
kind: snapshot::Kind::Restore, kind: snapshot::Kind::Restore,
block_at: to_block_id("latest")?, // unimportant. block_at: to_block_id("latest")?, // unimportant.
snapshot_conf: snapshot_conf,
}; };
Cmd::Snapshot(restore_cmd) Cmd::Snapshot(restore_cmd)
} else if self.args.cmd_export_hardcoded_sync { } else if self.args.cmd_export_hardcoded_sync {
@ -349,6 +354,7 @@ impl Configuration {
gas_price_percentile: self.args.arg_gas_price_percentile, gas_price_percentile: self.args.arg_gas_price_percentile,
poll_lifetime: self.args.arg_poll_lifetime, poll_lifetime: self.args.arg_poll_lifetime,
ws_conf: ws_conf, ws_conf: ws_conf,
snapshot_conf: snapshot_conf,
http_conf: http_conf, http_conf: http_conf,
ipc_conf: ipc_conf, ipc_conf: ipc_conf,
net_conf: net_conf, net_conf: net_conf,
@ -374,7 +380,6 @@ impl Configuration {
private_tx_enabled, private_tx_enabled,
name: self.args.arg_identity, name: self.args.arg_identity,
custom_bootnodes: self.args.arg_bootnodes.is_some(), custom_bootnodes: self.args.arg_bootnodes.is_some(),
no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
check_seal: !self.args.flag_no_seal_check, check_seal: !self.args.flag_no_seal_check,
download_old_blocks: !self.args.flag_no_ancient_blocks, download_old_blocks: !self.args.flag_no_ancient_blocks,
verifier_settings: verifier_settings, verifier_settings: verifier_settings,
@ -890,6 +895,18 @@ impl Configuration {
Ok((provider_conf, encryptor_conf, self.args.flag_private_enabled)) Ok((provider_conf, encryptor_conf, self.args.flag_private_enabled))
} }
fn snapshot_config(&self) -> Result<SnapshotConfiguration, String> {
let conf = SnapshotConfiguration {
no_periodic: self.args.flag_no_periodic_snapshot,
processing_threads: match self.args.arg_snapshot_threads {
Some(threads) if threads > 0 => threads,
_ => ::std::cmp::max(1, num_cpus::get() / 2),
},
};
Ok(conf)
}
fn network_settings(&self) -> Result<NetworkSettings, String> { fn network_settings(&self) -> Result<NetworkSettings, String> {
let http_conf = self.http_config()?; let http_conf = self.http_config()?;
let net_addresses = self.net_addresses()?; let net_addresses = self.net_addresses()?;
@ -1398,7 +1415,7 @@ mod tests {
name: "".into(), name: "".into(),
custom_bootnodes: false, custom_bootnodes: false,
fat_db: Default::default(), fat_db: Default::default(),
no_periodic_snapshot: false, snapshot_conf: Default::default(),
stratum: None, stratum: None,
check_seal: true, check_seal: true,
download_old_blocks: true, download_old_blocks: true,

View File

@ -286,7 +286,7 @@ impl Default for MinerExtras {
author: Default::default(), author: Default::default(),
engine_signer: Default::default(), engine_signer: Default::default(),
extra_data: version_data(), extra_data: version_data(),
gas_range_target: (4_700_000.into(), 6_283_184.into()), gas_range_target: (8_000_000.into(), 10_000_000.into()),
work_notify: Default::default(), work_notify: Default::default(),
} }
} }

View File

@ -25,7 +25,7 @@ use ethcore::account_provider::{AccountProvider, AccountProviderSettings};
use ethcore::client::{BlockId, CallContract, Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient, BlockInfo}; use ethcore::client::{BlockId, CallContract, Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient, BlockInfo};
use ethcore::ethstore::ethkey; use ethcore::ethstore::ethkey;
use ethcore::miner::{stratum, Miner, MinerService, MinerOptions}; use ethcore::miner::{stratum, Miner, MinerService, MinerOptions};
use ethcore::snapshot; use ethcore::snapshot::{self, SnapshotConfiguration};
use ethcore::spec::{SpecParams, OptimizeFor}; use ethcore::spec::{SpecParams, OptimizeFor};
use ethcore::verification::queue::VerifierSettings; use ethcore::verification::queue::VerifierSettings;
use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_logger::{Config as LogConfig, RotatingLogger};
@ -119,7 +119,7 @@ pub struct RunCmd {
pub name: String, pub name: String,
pub custom_bootnodes: bool, pub custom_bootnodes: bool,
pub stratum: Option<stratum::Options>, pub stratum: Option<stratum::Options>,
pub no_periodic_snapshot: bool, pub snapshot_conf: SnapshotConfiguration,
pub check_seal: bool, pub check_seal: bool,
pub download_old_blocks: bool, pub download_old_blocks: bool,
pub verifier_settings: VerifierSettings, pub verifier_settings: VerifierSettings,
@ -531,6 +531,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
client_config.queue.verifier_settings = cmd.verifier_settings; client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4); client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);
client_config.snapshot = cmd.snapshot_conf.clone();
// set up bootnodes // set up bootnodes
let mut net_conf = cmd.net_conf; let mut net_conf = cmd.net_conf;
@ -778,7 +779,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
}); });
// the watcher must be kept alive. // the watcher must be kept alive.
let watcher = match cmd.no_periodic_snapshot { let watcher = match cmd.snapshot_conf.no_periodic {
true => None, true => None,
false => { false => {
let sync = sync_provider.clone(); let sync = sync_provider.clone();

View File

@ -22,7 +22,7 @@ use std::sync::Arc;
use hash::keccak; use hash::keccak;
use ethcore::account_provider::AccountProvider; use ethcore::account_provider::AccountProvider;
use ethcore::snapshot::{Progress, RestorationStatus, SnapshotService as SS}; use ethcore::snapshot::{Progress, RestorationStatus, SnapshotConfiguration, SnapshotService as SS};
use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter}; use ethcore::snapshot::io::{SnapshotReader, PackedReader, PackedWriter};
use ethcore::snapshot::service::Service as SnapshotService; use ethcore::snapshot::service::Service as SnapshotService;
use ethcore::client::{Mode, DatabaseCompactionProfile, VMType}; use ethcore::client::{Mode, DatabaseCompactionProfile, VMType};
@ -62,6 +62,7 @@ pub struct SnapshotCommand {
pub file_path: Option<String>, pub file_path: Option<String>,
pub kind: Kind, pub kind: Kind,
pub block_at: BlockId, pub block_at: BlockId,
pub snapshot_conf: SnapshotConfiguration,
} }
// helper for reading chunks from arbitrary reader and feeding them into the // helper for reading chunks from arbitrary reader and feeding them into the
@ -165,7 +166,7 @@ impl SnapshotCommand {
execute_upgrades(&self.dirs.base, &db_dirs, algorithm, &self.compaction)?; execute_upgrades(&self.dirs.base, &db_dirs, algorithm, &self.compaction)?;
// prepare client config // prepare client config
let client_config = to_client_config( let mut client_config = to_client_config(
&self.cache_config, &self.cache_config,
spec.name.to_lowercase(), spec.name.to_lowercase(),
Mode::Active, Mode::Active,
@ -180,6 +181,8 @@ impl SnapshotCommand {
true, true,
); );
client_config.snapshot = self.snapshot_conf;
let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config); let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config);
let client_db = restoration_db_handler.open(&client_path) let client_db = restoration_db_handler.open(&client_path)
.map_err(|e| format!("Failed to open database {:?}", e))?; .map_err(|e| format!("Failed to open database {:?}", e))?;

View File

@ -58,7 +58,7 @@ keccak-hash = "0.1.2"
parity-reactor = { path = "../util/reactor" } parity-reactor = { path = "../util/reactor" }
parity-updater = { path = "../updater" } parity-updater = { path = "../updater" }
parity-version = { path = "../util/version" } parity-version = { path = "../util/version" }
patricia-trie = "0.2.1" patricia-trie = "0.2"
rlp = { version = "0.2.4", features = ["ethereum"] } rlp = { version = "0.2.4", features = ["ethereum"] }
stats = { path = "../util/stats" } stats = { path = "../util/stats" }
vm = { path = "../ethcore/vm" } vm = { path = "../ethcore/vm" }

View File

@ -50,8 +50,6 @@ impl TimeProvider for DefaultTimeProvider {
const TIME_THRESHOLD: u64 = 7; const TIME_THRESHOLD: u64 = 7;
/// minimal length of hash /// minimal length of hash
const TOKEN_LENGTH: usize = 16; const TOKEN_LENGTH: usize = 16;
/// special "initial" token used for authorization when there are no tokens yet.
const INITIAL_TOKEN: &'static str = "initial";
/// Separator between fields in serialized tokens file. /// Separator between fields in serialized tokens file.
const SEPARATOR: &'static str = ";"; const SEPARATOR: &'static str = ";";
/// Number of seconds to keep unused tokens. /// Number of seconds to keep unused tokens.
@ -163,16 +161,6 @@ impl<T: TimeProvider> AuthCodes<T> {
let as_token = |code| keccak(format!("{}:{}", code, time)); let as_token = |code| keccak(format!("{}:{}", code, time));
// Check if it's the initial token.
if self.is_empty() {
let initial = &as_token(INITIAL_TOKEN) == hash;
// Initial token can be used only once.
if initial {
let _ = self.generate_new();
}
return initial;
}
// look for code // look for code
for code in &mut self.codes { for code in &mut self.codes {
if &as_token(&code.code) == hash { if &as_token(&code.code) == hash {
@ -239,7 +227,7 @@ mod tests {
} }
#[test] #[test]
fn should_return_true_if_code_is_initial_and_store_is_empty() { fn should_return_false_even_if_code_is_initial_and_store_is_empty() {
// given // given
let code = "initial"; let code = "initial";
let time = 99; let time = 99;
@ -250,7 +238,7 @@ mod tests {
let res2 = codes.is_valid(&generate_hash(code, time), time); let res2 = codes.is_valid(&generate_hash(code, time), time);
// then // then
assert_eq!(res1, true); assert_eq!(res1, false);
assert_eq!(res2, false); assert_eq!(res2, false);
} }

View File

@ -136,7 +136,7 @@ mod testing {
} }
#[test] #[test]
fn should_allow_initial_connection_but_only_once() { fn should_not_allow_initial_connection_even_once() {
// given // given
let (server, port, authcodes) = serve(); let (server, port, authcodes) = serve();
let code = "initial"; let code = "initial";
@ -160,26 +160,9 @@ mod testing {
timestamp, timestamp,
) )
); );
let response2 = http_client::request(server.addr(),
&format!("\
GET / HTTP/1.1\r\n\
Host: 127.0.0.1:{}\r\n\
Connection: Close\r\n\
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==\r\n\
Sec-WebSocket-Protocol:{:?}_{}\r\n\
Sec-WebSocket-Version: 13\r\n\
\r\n\
{{}}
",
port,
keccak(format!("{}:{}", code, timestamp)),
timestamp,
)
);
// then // then
assert_eq!(response1.status, "HTTP/1.1 101 Switching Protocols".to_owned()); assert_eq!(response1.status, "HTTP/1.1 403 Forbidden".to_owned());
assert_eq!(response2.status, "HTTP/1.1 403 Forbidden".to_owned()); http_client::assert_security_headers_present(&response1.headers, None);
http_client::assert_security_headers_present(&response2.headers, None);
} }
} }

View File

@ -22,7 +22,12 @@ use v1::traits::Net;
/// Net rpc implementation. /// Net rpc implementation.
pub struct NetClient<S: ?Sized> { pub struct NetClient<S: ?Sized> {
sync: Arc<S> sync: Arc<S>,
/// Cached `network_id`.
///
/// We cache it to avoid redundant aquire of sync read lock.
/// https://github.com/paritytech/parity-ethereum/issues/8746
network_id: u64,
} }
impl<S: ?Sized> NetClient<S> where S: SyncProvider { impl<S: ?Sized> NetClient<S> where S: SyncProvider {
@ -30,17 +35,18 @@ impl<S: ?Sized> NetClient<S> where S: SyncProvider {
pub fn new(sync: &Arc<S>) -> Self { pub fn new(sync: &Arc<S>) -> Self {
NetClient { NetClient {
sync: sync.clone(), sync: sync.clone(),
network_id: sync.status().network_id,
} }
} }
} }
impl<S: ?Sized> Net for NetClient<S> where S: SyncProvider + 'static { impl<S: ?Sized> Net for NetClient<S> where S: SyncProvider + 'static {
fn version(&self) -> Result<String> { fn version(&self) -> Result<String> {
Ok(format!("{}", self.sync.status().network_id).to_owned()) Ok(format!("{}", self.network_id))
} }
fn peer_count(&self) -> Result<String> { fn peer_count(&self) -> Result<String> {
Ok(format!("0x{:x}", self.sync.status().num_peers as u64).to_owned()) Ok(format!("{:#x}", self.sync.status().num_peers as u64))
} }
fn is_listening(&self) -> Result<bool> { fn is_listening(&self) -> Result<bool> {

View File

@ -3,9 +3,9 @@
set -e # fail on any error set -e # fail on any error
set -u # treat unset variables as error set -u # treat unset variables as error
case ${CI_COMMIT_REF_NAME} in case ${CI_COMMIT_REF_NAME} in
nightly|*v2.1*) export GRADE="devel";; nightly|*v2.2*) export GRADE="devel";;
beta|*v2.0*) export GRADE="stable";; beta|*v2.1*) export GRADE="stable";;
stable|*v1.11*) export GRADE="stable";; stable|*v2.0*) export GRADE="stable";;
*) echo "No release" exit 0;; *) echo "No release" exit 0;;
esac esac
SNAP_PACKAGE="parity_"$VERSION"_"$BUILD_ARCH".snap" SNAP_PACKAGE="parity_"$VERSION"_"$BUILD_ARCH".snap"

View File

@ -3,7 +3,7 @@
set -e # fail on any error set -e # fail on any error
set -u # treat unset variables as error set -u # treat unset variables as error
if [ "$CI_COMMIT_REF_NAME" == "beta" ]; if [ "$CI_COMMIT_REF_NAME" == "master" ];
then export DOCKER_BUILD_TAG="latest"; then export DOCKER_BUILD_TAG="latest";
else export DOCKER_BUILD_TAG=$CI_COMMIT_REF_NAME; else export DOCKER_BUILD_TAG=$CI_COMMIT_REF_NAME;
fi fi

View File

@ -4,9 +4,9 @@ set -e # fail on any error
set -u # treat unset variables as error set -u # treat unset variables as error
case ${CI_COMMIT_REF_NAME} in case ${CI_COMMIT_REF_NAME} in
nightly|*v2.1*) export CHANNEL="edge";; nightly|*v2.2*) export CHANNEL="edge";;
beta|*v2.0*) export CHANNEL="beta";; beta|*v2.1*) export CHANNEL="beta";;
stable|*v1.11*) export CHANNEL="stable";; stable|*v2.0*) export CHANNEL="stable";;
*) echo "No release" exit 0;; *) echo "No release" exit 0;;
esac esac
echo "Release channel :" $CHANNEL " Branch/tag: " $CI_COMMIT_REF_NAME echo "Release channel :" $CHANNEL " Branch/tag: " $CI_COMMIT_REF_NAME

View File

@ -14,9 +14,9 @@ RELEASE_TABLE="$(echo "${RELEASE_TABLE//\$VERSION/${VERSION}}")"
#The text in the file CANGELOG.md before which the table with links is inserted. Must be present in this file necessarily #The text in the file CANGELOG.md before which the table with links is inserted. Must be present in this file necessarily
REPLACE_TEXT="The full list of included changes:" REPLACE_TEXT="The full list of included changes:"
case ${CI_COMMIT_REF_NAME} in case ${CI_COMMIT_REF_NAME} in
nightly|*v2.1*) NAME="Parity "$VERSION" nightly";; nightly|*v2.2*) NAME="Parity "$VERSION" nightly";;
beta|*v2.0*) NAME="Parity "$VERSION" beta";; beta|*v2.1*) NAME="Parity "$VERSION" beta";;
stable|*v1.11*) NAME="Parity "$VERSION" stable";; stable|*v2.0*) NAME="Parity "$VERSION" stable";;
*) echo "No release" exit 0;; *) echo "No release" exit 0;;
esac esac
cd artifacts cd artifacts

View File

@ -42,12 +42,12 @@ upload_files() {
git push --tags git push --tags
} }
RPC_TRAITS_DIR="rpc/src/v1/traits/" RPC_TRAITS_DIR="rpc/src/v1/traits"
setup_git setup_git
clone_repos clone_repos
mkdir -p "jsonrpc/.parity/$RPC_TRAITS_DIR" mkdir -p "jsonrpc/.parity/$RPC_TRAITS_DIR"
cp -r "$RPC_TRAITS_DIR" "jsonrpc/.parity/$RPC_TRAITS_DIR" cp $RPC_TRAITS_DIR/*.rs "jsonrpc/.parity/$RPC_TRAITS_DIR"
cd jsonrpc cd jsonrpc
build_docs build_docs
cd .. cd ..

View File

@ -42,9 +42,9 @@ const PACKET_PONG: u8 = 2;
const PACKET_FIND_NODE: u8 = 3; const PACKET_FIND_NODE: u8 = 3;
const PACKET_NEIGHBOURS: u8 = 4; const PACKET_NEIGHBOURS: u8 = 4;
const PING_TIMEOUT: Duration = Duration::from_millis(300); const PING_TIMEOUT: Duration = Duration::from_millis(500);
const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2); const FIND_NODE_TIMEOUT: Duration = Duration::from_secs(2);
const EXPIRY_TIME: Duration = Duration::from_secs(60); const EXPIRY_TIME: Duration = Duration::from_secs(20);
const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once const MAX_NODES_PING: usize = 32; // Max nodes to add/ping at once
const REQUEST_BACKOFF: [Duration; 4] = [ const REQUEST_BACKOFF: [Duration; 4] = [
Duration::from_secs(1), Duration::from_secs(1),
@ -80,15 +80,29 @@ impl BucketEntry {
} }
} }
pub struct NodeBucket { struct FindNodeRequest {
nodes: VecDeque<BucketEntry>, //sorted by last active // Time when the request was sent
sent_at: Instant,
// Number of items sent by the node
response_count: usize,
// Whether the request have been answered yet
answered: bool,
} }
struct PendingRequest { struct PingRequest {
packet_id: u8, // Time when the request was sent
sent_at: Instant, sent_at: Instant,
packet_hash: H256, // The node to which the request was sent
response_count: usize, // Some requests (eg. FIND_NODE) have multi-packet responses node: NodeEntry,
// The hash sent in the Ping request
echo_hash: H256,
// The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29)
#[deprecated()]
deprecated_echo_hash: H256,
}
pub struct NodeBucket {
nodes: VecDeque<BucketEntry>, //sorted by last active
} }
impl Default for NodeBucket { impl Default for NodeBucket {
@ -115,13 +129,13 @@ pub struct Discovery<'a> {
id_hash: H256, id_hash: H256,
secret: Secret, secret: Secret,
public_endpoint: NodeEndpoint, public_endpoint: NodeEndpoint,
discovery_round: u16, discovery_initiated: bool,
discovery_round: Option<u16>,
discovery_id: NodeId, discovery_id: NodeId,
discovery_nodes: HashSet<NodeId>, discovery_nodes: HashSet<NodeId>,
node_buckets: Vec<NodeBucket>, node_buckets: Vec<NodeBucket>,
in_flight_requests: HashMap<NodeId, PendingRequest>, in_flight_pings: HashMap<NodeId, PingRequest>,
expiring_pings: VecDeque<(NodeId, Instant)>, in_flight_find_nodes: HashMap<NodeId, FindNodeRequest>,
expiring_finds: VecDeque<(NodeId, Instant)>,
send_queue: VecDeque<Datagram>, send_queue: VecDeque<Datagram>,
check_timestamps: bool, check_timestamps: bool,
adding_nodes: Vec<NodeEntry>, adding_nodes: Vec<NodeEntry>,
@ -141,13 +155,13 @@ impl<'a> Discovery<'a> {
id_hash: keccak(key.public()), id_hash: keccak(key.public()),
secret: key.secret().clone(), secret: key.secret().clone(),
public_endpoint: public, public_endpoint: public,
discovery_round: 0, discovery_initiated: false,
discovery_round: None,
discovery_id: NodeId::new(), discovery_id: NodeId::new(),
discovery_nodes: HashSet::new(), discovery_nodes: HashSet::new(),
node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(),
in_flight_requests: HashMap::new(), in_flight_pings: HashMap::new(),
expiring_pings: VecDeque::new(), in_flight_find_nodes: HashMap::new(),
expiring_finds: VecDeque::new(),
send_queue: VecDeque::new(), send_queue: VecDeque::new(),
check_timestamps: true, check_timestamps: true,
adding_nodes: Vec::new(), adding_nodes: Vec::new(),
@ -175,15 +189,6 @@ impl<'a> Discovery<'a> {
} }
} }
/// Add a list of known nodes to the table.
pub fn init_node_list(&mut self, nodes: Vec<NodeEntry>) {
for n in nodes {
if self.is_allowed(&n) {
self.update_node(n);
}
}
}
fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> { fn update_node(&mut self, e: NodeEntry) -> Option<TableUpdates> {
trace!(target: "discovery", "Inserting {:?}", &e); trace!(target: "discovery", "Inserting {:?}", &e);
let id_hash = keccak(e.id); let id_hash = keccak(e.id);
@ -224,13 +229,20 @@ impl<'a> Discovery<'a> {
/// Starts the discovery process at round 0 /// Starts the discovery process at round 0
fn start(&mut self) { fn start(&mut self) {
trace!(target: "discovery", "Starting discovery"); trace!(target: "discovery", "Starting discovery");
self.discovery_round = 0; self.discovery_round = Some(0);
self.discovery_id.randomize(); //TODO: use cryptographic nonce self.discovery_id.randomize(); //TODO: use cryptographic nonce
self.discovery_nodes.clear(); self.discovery_nodes.clear();
} }
/// Complete the discovery process
fn stop(&mut self) {
trace!(target: "discovery", "Completing discovery");
self.discovery_round = None;
self.discovery_nodes.clear();
}
fn update_new_nodes(&mut self) { fn update_new_nodes(&mut self) {
while self.in_flight_requests.len() < MAX_NODES_PING { while self.in_flight_pings.len() < MAX_NODES_PING {
match self.adding_nodes.pop() { match self.adding_nodes.pop() {
Some(next) => self.try_ping(next), Some(next) => self.try_ping(next),
None => break, None => break,
@ -239,8 +251,12 @@ impl<'a> Discovery<'a> {
} }
fn discover(&mut self) { fn discover(&mut self) {
self.update_new_nodes(); let discovery_round = match self.discovery_round {
if self.discovery_round == DISCOVERY_MAX_STEPS { Some(r) => r,
None => return,
};
if discovery_round == DISCOVERY_MAX_STEPS {
self.stop();
return; return;
} }
trace!(target: "discovery", "Starting round {:?}", self.discovery_round); trace!(target: "discovery", "Starting round {:?}", self.discovery_round);
@ -263,12 +279,10 @@ impl<'a> Discovery<'a> {
} }
if tried_count == 0 { if tried_count == 0 {
trace!(target: "discovery", "Completing discovery"); self.stop();
self.discovery_round = DISCOVERY_MAX_STEPS;
self.discovery_nodes.clear();
return; return;
} }
self.discovery_round += 1; self.discovery_round = Some(discovery_round + 1);
} }
/// The base 2 log of the distance between a and b using the XOR metric. /// The base 2 log of the distance between a and b using the XOR metric.
@ -285,14 +299,20 @@ impl<'a> Discovery<'a> {
} }
fn try_ping(&mut self, node: NodeEntry) { fn try_ping(&mut self, node: NodeEntry) {
if !self.is_allowed(&node) || if !self.is_allowed(&node) {
self.in_flight_requests.contains_key(&node.id) || trace!(target: "discovery", "Node {:?} not allowed", node);
self.adding_nodes.iter().any(|n| n.id == node.id) return;
{ }
if self.in_flight_pings.contains_key(&node.id) || self.in_flight_find_nodes.contains_key(&node.id) {
trace!(target: "discovery", "Node {:?} in flight requests", node);
return;
}
if self.adding_nodes.iter().any(|n| n.id == node.id) {
trace!(target: "discovery", "Node {:?} in adding nodes", node);
return; return;
} }
if self.in_flight_requests.len() < MAX_NODES_PING { if self.in_flight_pings.len() < MAX_NODES_PING {
self.ping(&node) self.ping(&node)
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
warn!(target: "discovery", "Error sending Ping packet: {:?}", e); warn!(target: "discovery", "Error sending Ping packet: {:?}", e);
@ -308,18 +328,17 @@ impl<'a> Discovery<'a> {
self.public_endpoint.to_rlp_list(&mut rlp); self.public_endpoint.to_rlp_list(&mut rlp);
node.endpoint.to_rlp_list(&mut rlp); node.endpoint.to_rlp_list(&mut rlp);
append_expiration(&mut rlp); append_expiration(&mut rlp);
let old_parity_hash = keccak(rlp.as_raw());
let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?; let hash = self.send_packet(PACKET_PING, &node.endpoint.udp_address(), &rlp.drain())?;
let request_info = PendingRequest { self.in_flight_pings.insert(node.id, PingRequest {
packet_id: PACKET_PING,
sent_at: Instant::now(), sent_at: Instant::now(),
packet_hash: hash, node: node.clone(),
response_count: 0, echo_hash: hash,
}; deprecated_echo_hash: old_parity_hash,
self.expiring_pings.push_back((node.id, request_info.sent_at)); });
self.in_flight_requests.insert(node.id, request_info);
trace!(target: "discovery", "Sent Ping to {:?}", &node.endpoint); trace!(target: "discovery", "Sent Ping to {:?} ; node_id={:#x}", &node.endpoint, node.id);
Ok(()) Ok(())
} }
@ -327,16 +346,13 @@ impl<'a> Discovery<'a> {
let mut rlp = RlpStream::new_list(2); let mut rlp = RlpStream::new_list(2);
rlp.append(target); rlp.append(target);
append_expiration(&mut rlp); append_expiration(&mut rlp);
let hash = self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?; self.send_packet(PACKET_FIND_NODE, &node.endpoint.udp_address(), &rlp.drain())?;
let request_info = PendingRequest { self.in_flight_find_nodes.insert(node.id, FindNodeRequest {
packet_id: PACKET_FIND_NODE,
sent_at: Instant::now(), sent_at: Instant::now(),
packet_hash: hash,
response_count: 0, response_count: 0,
}; answered: false,
self.expiring_finds.push_back((node.id, request_info.sent_at)); });
self.in_flight_requests.insert(node.id, request_info);
trace!(target: "discovery", "Sent FindNode to {:?}", &node.endpoint); trace!(target: "discovery", "Sent FindNode to {:?}", &node.endpoint);
Ok(()) Ok(())
@ -448,20 +464,31 @@ impl<'a> Discovery<'a> {
entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id entry.endpoint.is_allowed(&self.ip_filter) && entry.id != self.id
} }
fn on_ping(&mut self, rlp: &Rlp, node: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> { fn on_ping(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr, echo_hash: &[u8]) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Ping from {:?}", &from); trace!(target: "discovery", "Got Ping from {:?}", &from);
let source = NodeEndpoint::from_rlp(&rlp.at(1)?)?; let ping_from = NodeEndpoint::from_rlp(&rlp.at(1)?)?;
let dest = NodeEndpoint::from_rlp(&rlp.at(2)?)?; let ping_to = NodeEndpoint::from_rlp(&rlp.at(2)?)?;
let timestamp: u64 = rlp.val_at(3)?; let timestamp: u64 = rlp.val_at(3)?;
self.check_timestamp(timestamp)?; self.check_timestamp(timestamp)?;
let mut response = RlpStream::new_list(3); let mut response = RlpStream::new_list(3);
dest.to_rlp_list(&mut response); let pong_to = NodeEndpoint {
address: from.clone(),
udp_port: ping_from.udp_port
};
// Here the PONG's `To` field should be the node we are
// sending the request to
// WARNING: this field _should not be used_, but old Parity versions
// use it in order to get the node's address.
// So this is a temporary fix so that older Parity versions don't brake completely.
ping_to.to_rlp_list(&mut response);
// pong_to.to_rlp_list(&mut response);
response.append(&echo_hash); response.append(&echo_hash);
append_expiration(&mut response); append_expiration(&mut response);
self.send_packet(PACKET_PONG, from, &response.drain())?; self.send_packet(PACKET_PONG, from, &response.drain())?;
let entry = NodeEntry { id: *node, endpoint: source.clone() }; let entry = NodeEntry { id: *node_id, endpoint: pong_to.clone() };
if !entry.endpoint.is_valid() { if !entry.endpoint.is_valid() {
debug!(target: "discovery", "Got bad address: {:?}", entry); debug!(target: "discovery", "Got bad address: {:?}", entry);
} else if !self.is_allowed(&entry) { } else if !self.is_allowed(&entry) {
@ -469,40 +496,45 @@ impl<'a> Discovery<'a> {
} else { } else {
self.add_node(entry.clone()); self.add_node(entry.clone());
} }
Ok(None) Ok(None)
} }
fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> { fn on_pong(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
trace!(target: "discovery", "Got Pong from {:?}", &from); trace!(target: "discovery", "Got Pong from {:?} ; node_id={:#x}", &from, node_id);
let dest = NodeEndpoint::from_rlp(&rlp.at(0)?)?; let _pong_to = NodeEndpoint::from_rlp(&rlp.at(0)?)?;
let echo_hash: H256 = rlp.val_at(1)?; let echo_hash: H256 = rlp.val_at(1)?;
let timestamp: u64 = rlp.val_at(2)?; let timestamp: u64 = rlp.val_at(2)?;
self.check_timestamp(timestamp)?; self.check_timestamp(timestamp)?;
let mut node = NodeEntry { id: *node_id, endpoint: dest };
if !node.endpoint.is_valid() {
debug!(target: "discovery", "Bad address: {:?}", node);
node.endpoint.address = *from;
}
let is_expected = match self.in_flight_requests.entry(*node_id) { let expected_node = match self.in_flight_pings.entry(*node_id) {
Entry::Occupied(entry) => { Entry::Occupied(entry) => {
let is_expected = { let expected_node = {
let request = entry.get(); let request = entry.get();
request.packet_id == PACKET_PING && request.packet_hash == echo_hash if request.echo_hash != echo_hash && request.deprecated_echo_hash != echo_hash {
debug!(target: "discovery", "Got unexpected Pong from {:?} ; packet_hash={:#x} ; expected_hash={:#x}", &from, request.echo_hash, echo_hash);
None
} else {
if request.deprecated_echo_hash == echo_hash {
trace!(target: "discovery", "Got Pong from an old parity-ethereum version.");
}
Some(request.node.clone())
}
}; };
if is_expected {
if expected_node.is_some() {
entry.remove(); entry.remove();
} }
is_expected expected_node
},
Entry::Vacant(_) => {
None
}, },
Entry::Vacant(_) => false
}; };
if is_expected { if let Some(node) = expected_node {
Ok(self.update_node(node)) Ok(self.update_node(node))
} else { } else {
debug!(target: "discovery", "Got unexpected Pong from {:?}", &from); debug!(target: "discovery", "Got unexpected Pong from {:?} ; request not found", &from);
Ok(None) Ok(None)
} }
} }
@ -544,29 +576,32 @@ impl<'a> Discovery<'a> {
fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> { fn on_neighbours(&mut self, rlp: &Rlp, node_id: &NodeId, from: &SocketAddr) -> Result<Option<TableUpdates>, Error> {
let results_count = rlp.at(0)?.item_count()?; let results_count = rlp.at(0)?.item_count()?;
let is_expected = match self.in_flight_requests.entry(*node_id) { let is_expected = match self.in_flight_find_nodes.entry(*node_id) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let result = { let expected = {
let request = entry.get_mut(); let request = entry.get_mut();
if request.packet_id == PACKET_FIND_NODE && // Mark the request as answered
request.response_count + results_count <= BUCKET_SIZE request.answered = true;
{ if request.response_count + results_count <= BUCKET_SIZE {
request.response_count += results_count; request.response_count += results_count;
true true
} else { } else {
debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; oversized packet ({} + {}) node_id={:#x}", &from, request.response_count, results_count, node_id);
false false
} }
}; };
if entry.get().response_count == BUCKET_SIZE { if entry.get().response_count == BUCKET_SIZE {
entry.remove(); entry.remove();
} }
result expected
} }
Entry::Vacant(_) => false, Entry::Vacant(_) => {
debug!(target: "discovery", "Got unexpected Neighbors from {:?} ; couldn't find node_id={:#x}", &from, node_id);
false
},
}; };
if !is_expected { if !is_expected {
debug!(target: "discovery", "Got unexpected Neighbors from {:?}", &from);
return Ok(None); return Ok(None);
} }
@ -591,65 +626,74 @@ impl<'a> Discovery<'a> {
Ok(None) Ok(None)
} }
fn check_expired(&mut self, time: Instant) -> HashSet<NodeId> { fn check_expired(&mut self, time: Instant) {
let mut removed: HashSet<NodeId> = HashSet::new(); let mut nodes_to_expire = Vec::new();
while let Some((node_id, sent_at)) = self.expiring_pings.pop_front() { self.in_flight_pings.retain(|node_id, ping_request| {
if time.duration_since(sent_at) <= PING_TIMEOUT { if time.duration_since(ping_request.sent_at) > PING_TIMEOUT {
self.expiring_pings.push_front((node_id, sent_at)); debug!(target: "discovery", "Removing expired PING request for node_id={:#x}", node_id);
break; nodes_to_expire.push(*node_id);
false
} else {
true
} }
self.expire_in_flight_request(node_id, sent_at, &mut removed); });
} self.in_flight_find_nodes.retain(|node_id, find_node_request| {
while let Some((node_id, sent_at)) = self.expiring_finds.pop_front() { if time.duration_since(find_node_request.sent_at) > FIND_NODE_TIMEOUT {
if time.duration_since(sent_at) <= FIND_NODE_TIMEOUT { if !find_node_request.answered {
self.expiring_finds.push_front((node_id, sent_at)); debug!(target: "discovery", "Removing expired FIND NODE request for node_id={:#x}", node_id);
break; nodes_to_expire.push(*node_id);
}
self.expire_in_flight_request(node_id, sent_at, &mut removed);
}
removed
}
fn expire_in_flight_request(&mut self, node_id: NodeId, sent_at: Instant, removed: &mut HashSet<NodeId>) {
if let Entry::Occupied(entry) = self.in_flight_requests.entry(node_id) {
if entry.get().sent_at == sent_at {
entry.remove();
// Attempt to remove from bucket if in one.
let id_hash = keccak(&node_id);
let dist = Discovery::distance(&self.id_hash, &id_hash)
.expect("distance is None only if id hashes are equal; will never send request to self; qed");
let bucket = &mut self.node_buckets[dist];
if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) {
if bucket.nodes[index].fail_count < self.request_backoff.len() {
let node = &mut bucket.nodes[index];
node.backoff_until = Instant::now() + self.request_backoff[node.fail_count];
node.fail_count += 1;
trace!(
target: "discovery",
"Requests to node {:?} timed out {} consecutive time(s)",
&node.address, node.fail_count
);
} else {
removed.insert(node_id);
let node = bucket.nodes.remove(index).expect("index was located in if condition");
debug!(target: "discovery", "Removed expired node {:?}", &node.address);
}
} }
false
} else {
true
}
});
for node_id in nodes_to_expire {
self.expire_node_request(node_id);
}
}
fn expire_node_request(&mut self, node_id: NodeId) {
// Attempt to remove from bucket if in one.
let id_hash = keccak(&node_id);
let dist = Discovery::distance(&self.id_hash, &id_hash)
.expect("distance is None only if id hashes are equal; will never send request to self; qed");
let bucket = &mut self.node_buckets[dist];
if let Some(index) = bucket.nodes.iter().position(|n| n.id_hash == id_hash) {
if bucket.nodes[index].fail_count < self.request_backoff.len() {
let node = &mut bucket.nodes[index];
node.backoff_until = Instant::now() + self.request_backoff[node.fail_count];
node.fail_count += 1;
trace!(
target: "discovery",
"Requests to node {:?} timed out {} consecutive time(s)",
&node.address, node.fail_count
);
} else {
let node = bucket.nodes.remove(index).expect("index was located in if condition");
debug!(target: "discovery", "Removed expired node {:?}", &node.address);
} }
} }
} }
pub fn round(&mut self) -> Option<TableUpdates> {
let removed = self.check_expired(Instant::now()); pub fn round(&mut self) {
self.discover(); self.check_expired(Instant::now());
if !removed.is_empty() { self.update_new_nodes();
Some(TableUpdates { added: HashMap::new(), removed })
} else { None } if self.discovery_round.is_some() {
self.discover();
// Start discovering if the first pings have been sent (or timed out)
} else if self.in_flight_pings.len() == 0 && !self.discovery_initiated {
self.discovery_initiated = true;
self.refresh();
}
} }
pub fn refresh(&mut self) { pub fn refresh(&mut self) {
self.start(); if self.discovery_round.is_none() {
self.start();
}
} }
pub fn any_sends_queued(&self) -> bool { pub fn any_sends_queued(&self) -> bool {
@ -663,6 +707,16 @@ impl<'a> Discovery<'a> {
pub fn requeue_send(&mut self, datagram: Datagram) { pub fn requeue_send(&mut self, datagram: Datagram) {
self.send_queue.push_front(datagram) self.send_queue.push_front(datagram)
} }
/// Add a list of known nodes to the table.
#[cfg(test)]
pub fn init_node_list(&mut self, nodes: Vec<NodeEntry>) {
for n in nodes {
if self.is_allowed(&n) {
self.update_node(n);
}
}
}
} }
fn append_expiration(rlp: &mut RlpStream) { fn append_expiration(rlp: &mut RlpStream) {
@ -738,13 +792,13 @@ mod tests {
for i in 1..(MAX_NODES_PING+1) { for i in 1..(MAX_NODES_PING+1) {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
assert_eq!(discovery.in_flight_requests.len(), i); assert_eq!(discovery.in_flight_pings.len(), i);
assert_eq!(discovery.send_queue.len(), i); assert_eq!(discovery.send_queue.len(), i);
assert_eq!(discovery.adding_nodes.len(), 0); assert_eq!(discovery.adding_nodes.len(), 0);
} }
for i in 1..20 { for i in 1..20 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
assert_eq!(discovery.in_flight_requests.len(), MAX_NODES_PING); assert_eq!(discovery.in_flight_pings.len(), MAX_NODES_PING);
assert_eq!(discovery.send_queue.len(), MAX_NODES_PING); assert_eq!(discovery.send_queue.len(), MAX_NODES_PING);
assert_eq!(discovery.adding_nodes.len(), i); assert_eq!(discovery.adding_nodes.len(), i);
} }
@ -821,23 +875,29 @@ mod tests {
assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200); assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200);
// Requests have not expired yet. // Requests have not expired yet.
let removed = discovery.check_expired(Instant::now()).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now());
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert_eq!(removed, 0); assert_eq!(removed, 0);
// Expiring pings to bucket nodes removes them from bucket. // Expiring pings to bucket nodes removes them from bucket.
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + PING_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert!(removed > 0); assert!(removed > 0);
assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200 - removed); assert_eq!(total_bucket_nodes(&discovery.node_buckets), 1200 - removed);
for _ in 0..100 { for _ in 0..100 {
discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() });
} }
assert!(discovery.in_flight_requests.len() > 0); assert!(discovery.in_flight_pings.len() > 0);
// Expire pings to nodes that are not in buckets. // Expire pings to nodes that are not in buckets.
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + PING_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert_eq!(removed, 0); assert_eq!(removed, 0);
assert_eq!(discovery.in_flight_requests.len(), 0); assert_eq!(discovery.in_flight_pings.len(), 0);
let from = SocketAddr::from_str("99.99.99.99:40445").unwrap(); let from = SocketAddr::from_str("99.99.99.99:40445").unwrap();
@ -849,7 +909,9 @@ mod tests {
discovery.on_packet(&packet, from.clone()).unwrap(); discovery.on_packet(&packet, from.clone()).unwrap();
} }
let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert!(removed > 0); assert!(removed > 0);
// FIND_NODE does not time out because it receives k results. // FIND_NODE does not time out because it receives k results.
@ -859,7 +921,9 @@ mod tests {
discovery.on_packet(&packet, from.clone()).unwrap(); discovery.on_packet(&packet, from.clone()).unwrap();
} }
let removed = discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + FIND_NODE_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert_eq!(removed, 0); assert_eq!(removed, 0);
// Test bucket evictions with retries. // Test bucket evictions with retries.
@ -868,12 +932,16 @@ mod tests {
for _ in 0..2 { for _ in 0..2 {
discovery.ping(&node_entries[101]).unwrap(); discovery.ping(&node_entries[101]).unwrap();
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + PING_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert_eq!(removed, 0); assert_eq!(removed, 0);
} }
discovery.ping(&node_entries[101]).unwrap(); discovery.ping(&node_entries[101]).unwrap();
let removed = discovery.check_expired(Instant::now() + PING_TIMEOUT).len(); let num_nodes = total_bucket_nodes(&discovery.node_buckets);
discovery.check_expired(Instant::now() + PING_TIMEOUT);
let removed = num_nodes - total_bucket_nodes(&discovery.node_buckets);
assert_eq!(removed, 1); assert_eq!(removed, 1);
} }
@ -1066,9 +1134,11 @@ mod tests {
assert_eq!(ep1, NodeEndpoint::from_rlp(&rlp.at(1).unwrap()).unwrap()); assert_eq!(ep1, NodeEndpoint::from_rlp(&rlp.at(1).unwrap()).unwrap());
assert_eq!(ep2, NodeEndpoint::from_rlp(&rlp.at(2).unwrap()).unwrap()); assert_eq!(ep2, NodeEndpoint::from_rlp(&rlp.at(2).unwrap()).unwrap());
// `discovery1` should be added to node table on ping received
if let Some(_) = discovery2.on_packet(&ping_data.payload, ep1.address.clone()).unwrap() { if let Some(_) = discovery2.on_packet(&ping_data.payload, ep1.address.clone()).unwrap() {
panic!("Expected no changes to discovery2's table"); panic!("Expected no changes to discovery2's table");
} }
let pong_data = discovery2.dequeue_send().unwrap(); let pong_data = discovery2.dequeue_send().unwrap();
let data = &pong_data.payload[(32 + 65)..]; let data = &pong_data.payload[(32 + 65)..];
assert_eq!(data[0], PACKET_PONG); assert_eq!(data[0], PACKET_PONG);

View File

@ -59,8 +59,9 @@ const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
const IDLE: TimerToken = SYS_TIMER + 2; const IDLE: TimerToken = SYS_TIMER + 2;
const DISCOVERY: StreamToken = SYS_TIMER + 3; const DISCOVERY: StreamToken = SYS_TIMER + 3;
const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4; const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 5; const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
const NODE_TABLE: TimerToken = SYS_TIMER + 6; const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
const NODE_TABLE: TimerToken = SYS_TIMER + 7;
const FIRST_SESSION: StreamToken = 0; const FIRST_SESSION: StreamToken = 0;
const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1; const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
const USER_TIMER: TimerToken = LAST_SESSION + 256; const USER_TIMER: TimerToken = LAST_SESSION + 256;
@ -71,6 +72,8 @@ const SYS_TIMER: TimerToken = LAST_SESSION + 1;
const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1); const MAINTENANCE_TIMEOUT: Duration = Duration::from_secs(1);
// for DISCOVERY_REFRESH TimerToken // for DISCOVERY_REFRESH TimerToken
const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60); const DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(60);
// for FAST_DISCOVERY_REFRESH TimerToken
const FAST_DISCOVERY_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
// for DISCOVERY_ROUND TimerToken // for DISCOVERY_ROUND TimerToken
const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300); const DISCOVERY_ROUND_TIMEOUT: Duration = Duration::from_millis(300);
// for NODE_TABLE TimerToken // for NODE_TABLE TimerToken
@ -478,10 +481,10 @@ impl Host {
let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket"); let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket");
*self.udp_socket.lock() = Some(socket); *self.udp_socket.lock() = Some(socket);
discovery.init_node_list(self.nodes.read().entries());
discovery.add_node_list(self.nodes.read().entries()); discovery.add_node_list(self.nodes.read().entries());
*self.discovery.lock() = Some(discovery); *self.discovery.lock() = Some(discovery);
io.register_stream(DISCOVERY)?; io.register_stream(DISCOVERY)?;
io.register_timer(FAST_DISCOVERY_REFRESH, FAST_DISCOVERY_REFRESH_TIMEOUT)?;
io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT)?; io.register_timer(DISCOVERY_REFRESH, DISCOVERY_REFRESH_TIMEOUT)?;
io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT)?; io.register_timer(DISCOVERY_ROUND, DISCOVERY_ROUND_TIMEOUT)?;
} }
@ -533,6 +536,18 @@ impl Host {
} }
} }
fn has_enough_peers(&self) -> bool {
let min_peers = {
let info = self.info.read();
let config = &info.config;
config.min_peers
};
let (_, egress_count, ingress_count) = self.session_count();
return egress_count + ingress_count >= min_peers as usize;
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) { fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = { let (min_peers, mut pin, max_handshakes, allow_ips, self_id) = {
let info = self.info.read(); let info = self.info.read();
@ -1014,16 +1029,23 @@ impl IoHandler<NetworkIoMessage> for Host {
IDLE => self.maintain_network(io), IDLE => self.maintain_network(io),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
DISCOVERY_REFRESH => { DISCOVERY_REFRESH => {
if let Some(d) = self.discovery.lock().as_mut() { // Run the _slow_ discovery if enough peers are connected
d.refresh(); if !self.has_enough_peers() {
} return;
}
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
},
FAST_DISCOVERY_REFRESH => {
// Run the fast discovery if not enough peers are connected
if self.has_enough_peers() {
return;
}
self.discovery.lock().as_mut().map(|d| d.refresh());
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}, },
DISCOVERY_ROUND => { DISCOVERY_ROUND => {
let node_changes = { self.discovery.lock().as_mut().and_then(|d| d.round()) }; self.discovery.lock().as_mut().map(|d| d.round());
if let Some(node_changes) = node_changes {
self.update_nodes(io, node_changes);
}
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
}, },
NODE_TABLE => { NODE_TABLE => {

View File

@ -385,7 +385,7 @@ impl NodeTable {
None => return, None => return,
}; };
if let Err(e) = fs::create_dir_all(&path) { if let Err(e) = fs::create_dir_all(&path) {
warn!("Error creating node table directory: {:?}", e); warn!(target: "network", "Error creating node table directory: {:?}", e);
return; return;
} }
path.push(NODES_FILE); path.push(NODES_FILE);
@ -400,11 +400,11 @@ impl NodeTable {
match fs::File::create(&path) { match fs::File::create(&path) {
Ok(file) => { Ok(file) => {
if let Err(e) = serde_json::to_writer_pretty(file, &table) { if let Err(e) = serde_json::to_writer_pretty(file, &table) {
warn!("Error writing node table file: {:?}", e); warn!(target: "network", "Error writing node table file: {:?}", e);
} }
}, },
Err(e) => { Err(e) => {
warn!("Error creating node table file: {:?}", e); warn!(target: "network", "Error creating node table file: {:?}", e);
} }
} }
} }
@ -418,7 +418,7 @@ impl NodeTable {
let file = match fs::File::open(&path) { let file = match fs::File::open(&path) {
Ok(file) => file, Ok(file) => file,
Err(e) => { Err(e) => {
debug!("Error opening node table file: {:?}", e); debug!(target: "network", "Error opening node table file: {:?}", e);
return Default::default(); return Default::default();
}, },
}; };
@ -431,7 +431,7 @@ impl NodeTable {
.collect() .collect()
}, },
Err(e) => { Err(e) => {
warn!("Error reading node table file: {:?}", e); warn!(target: "network", "Error reading node table file: {:?}", e);
Default::default() Default::default()
}, },
} }

View File

@ -6,7 +6,7 @@ description = "Merkle-Patricia Trie (Ethereum Style)"
license = "GPL-3.0" license = "GPL-3.0"
[dependencies] [dependencies]
patricia-trie = "0.2.1" patricia-trie = "0.2"
keccak-hasher = { version = "0.1.1", path = "../keccak-hasher" } keccak-hasher = { version = "0.1.1", path = "../keccak-hasher" }
hashdb = "0.2" hashdb = "0.2"
rlp = { version = "0.2.4", features = ["ethereum"] } rlp = { version = "0.2.4", features = ["ethereum"] }

View File

@ -10,15 +10,15 @@ build = "build.rs"
[package.metadata] [package.metadata]
# This versions track. Should be changed to `stable` or `beta` when on respective branches. # This versions track. Should be changed to `stable` or `beta` when on respective branches.
# Used by auto-updater and for Parity version string. # Used by auto-updater and for Parity version string.
track = "nightly" track = "beta"
# Network specific settings, used ONLY by auto-updater. # Network specific settings, used ONLY by auto-updater.
# Latest supported fork blocks. # Latest supported fork blocks.
# Indicates a critical release in this track (i.e. consensus issue). # Indicates a critical release in this track (i.e. consensus issue).
[package.metadata.networks] [package.metadata.networks]
foundation = { forkBlock = 4370000, critical = false } foundation = { forkBlock = 4370000, critical = true }
ropsten = { forkBlock = 10, critical = false } ropsten = { forkBlock = 10, critical = true }
kovan = { forkBlock = 6600000, critical = false } kovan = { forkBlock = 6600000, critical = true }
[dependencies] [dependencies]
parity-bytes = "0.1" parity-bytes = "0.1"