Backport commits to beta (#1763)

* Don't try to sync to ancient blocks

* Parallel block body download

* Fixed reading chunked EIP8 handshake (#1712)

* Fixed reading chunked EIP8 handshake

* Added missing break

* Disconnect peers on a fork

* Updated json-ipc-server

* Combine mining queue and enabled into single locked datum (#1749)

* Combine mining queue and enabled into single locked datum

Additional tracing.

* Fix bug uncovered by test.

* Fix typo

* Remove unneeded log initialisation in test.

* fix failing test (#1756)

* Fixed test

* Suicides tracing (#1688)

* tracing suicide

* fixed #1635

* fixed typo

* Stackoverflow #1686 (#1698)

* flat trace serialization

* tracing finds transaction which creates contract

* flatten traces before inserting them to the db

* Trace other types of calls (#1727)

* Trace through DELEGATECALL and CALLCODE

Add them to the JSON output and RLP database store.

* Fix tests.

* Fix all tests.

* Fix one more test.

* filtering transactions toAddress includes contract creation (#1697)

* tracing finds transaction which creates contract

* comma cleanup

Remove when following `}`s, add to final entries.

* Various improvements to tracing & diagnostics. (#1707)

* Various improvements to tracing & diagnostics.

- Manage possibility of `Account` not having code for `PodAccount`
- New RPC: `trace_sendRawTransaction`
- See raw transaction dump when inspecting over RPC

* Fix test

* Remove one of the dupe error messages

* Remove unneeded `&`s

* Reformat and extremely minor optimisation

* Minor optimisation

* Remove unneeded let

* Fix tests.

* Additional fix.

* Minor rename.

* Bowing to the pressure.

* Stackoverflow fix (#1742)

* executive tracer builds flat traces without intermediate struct

* temporarilt commented out tests for traces

* fixed new way of building trace address

* fixed new way of building trace address

* updating state tests with flat tracing in progress

* fixed flat tracing tests

* fixed compiling ethcore-rpc with new flat traces

* removed warnings from ethcore module

* remove unused data structures
This commit is contained in:
Arkadiy Paronyan
2016-07-30 15:37:18 +02:00
committed by Gav Wood
parent 429c83f92f
commit 8017daf47c
47 changed files with 1441 additions and 891 deletions

View File

@@ -120,13 +120,22 @@ impl BlockCollection {
if let Some(head) = head {
match self.blocks.get(&head) {
Some(block) if block.body.is_none() && !self.downloading_bodies.contains(&head) => {
self.downloading_bodies.insert(head.clone());
needed_bodies.push(head.clone());
}
_ => (),
}
}
}
self.downloading_bodies.extend(needed_bodies.iter());
for h in self.header_ids.values() {
if needed_bodies.len() >= count {
break;
}
if !self.downloading_bodies.contains(h) {
needed_bodies.push(h.clone());
self.downloading_bodies.insert(h.clone());
}
}
needed_bodies
}
@@ -286,6 +295,7 @@ impl BlockCollection {
self.parents.insert(info.parent_hash.clone(), hash.clone());
self.blocks.insert(hash.clone(), block);
trace!(target: "sync", "New header: {}", hash.hex());
Ok(hash)
}

View File

@@ -113,14 +113,15 @@ const MAX_NODE_DATA_TO_SEND: usize = 1024;
const MAX_RECEIPTS_TO_SEND: usize = 1024;
const MAX_RECEIPTS_HEADERS_TO_SEND: usize = 256;
const MAX_HEADERS_TO_REQUEST: usize = 128;
const MAX_BODIES_TO_REQUEST: usize = 64;
const MAX_BODIES_TO_REQUEST: usize = 128;
const MIN_PEERS_PROPAGATION: usize = 4;
const MAX_PEERS_PROPAGATION: usize = 128;
const MAX_PEER_LAG_PROPAGATION: BlockNumber = 20;
const SUBCHAIN_SIZE: usize = 64;
const SUBCHAIN_SIZE: usize = 256;
const MAX_ROUND_PARENTS: usize = 32;
const MAX_NEW_HASHES: usize = 64;
const MAX_TX_TO_IMPORT: usize = 512;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
const STATUS_PACKET: u8 = 0x00;
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
@@ -136,7 +137,9 @@ const NODE_DATA_PACKET: u8 = 0x0e;
const GET_RECEIPTS_PACKET: u8 = 0x0f;
const RECEIPTS_PACKET: u8 = 0x10;
const CONNECTION_TIMEOUT_SEC: f64 = 15f64;
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
const BODIES_TIMEOUT_SEC: f64 = 5f64;
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
/// Sync state
@@ -184,6 +187,7 @@ pub struct SyncStatus {
/// Peer data type requested
enum PeerAsking {
Nothing,
ForkHeader,
BlockHeaders,
BlockBodies,
Heads,
@@ -212,6 +216,16 @@ struct PeerInfo {
asking_hash: Option<H256>,
/// Request timestamp
ask_time: f64,
/// Pending request is expird and result should be ignored
expired: bool,
/// Peer fork confirmed
confirmed: bool,
}
impl PeerInfo {
fn is_available(&self) -> bool {
self.confirmed && !self.expired
}
}
/// Blockchain sync handler.
@@ -245,6 +259,8 @@ pub struct ChainSync {
round_parents: VecDeque<(H256, H256)>,
/// Network ID
network_id: U256,
/// Optional fork block to check
fork_block: Option<(BlockNumber, H256)>,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
@@ -268,6 +284,7 @@ impl ChainSync {
round_parents: VecDeque::new(),
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
fork_block: config.fork_block,
};
sync.reset();
sync
@@ -284,8 +301,8 @@ impl ChainSync {
highest_block_number: self.highest_block.map(|n| max(n, self.last_imported_block)),
blocks_received: if self.last_imported_block > self.starting_block { self.last_imported_block - self.starting_block } else { 0 },
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
num_peers: self.peers.len(),
num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(),
num_peers: self.peers.values().filter(|p| p.confirmed).count(),
num_active_peers: self.peers.values().filter(|p| p.confirmed && p.asking != PeerAsking::Nothing).count(),
mem_used:
self.blocks.heap_size()
+ self.peers.heap_size_of_children()
@@ -306,6 +323,10 @@ impl ChainSync {
for (_, ref mut p) in &mut self.peers {
p.asking_blocks.clear();
p.asking_hash = None;
// mark any pending requests as expired
if p.asking != PeerAsking::Nothing && p.confirmed {
p.expired = true;
}
}
self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle;
@@ -356,6 +377,8 @@ impl ChainSync {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: self.fork_block.is_none(),
};
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
@@ -383,18 +406,43 @@ impl ChainSync {
self.peers.insert(peer_id.clone(), peer);
self.active_peers.insert(peer_id.clone());
debug!(target: "sync", "Connected {}:{}", peer_id, io.peer_info(peer_id));
self.sync_peer(io, peer_id, false);
if let Some((fork_block, _)) = self.fork_block {
self.request_headers_by_number(io, peer_id, fork_block, 1, 0, false, PeerAsking::ForkHeader);
} else {
self.sync_peer(io, peer_id, false);
}
Ok(())
}
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
/// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let confirmed = match self.peers.get_mut(&peer_id) {
Some(ref mut peer) if peer.asking == PeerAsking::ForkHeader => {
let item_count = r.item_count();
if item_count == 0 || (item_count == 1 && try!(r.at(0)).as_raw().sha3() == self.fork_block.unwrap().1) {
trace!(target: "sync", "{}: Confirmed peer", peer_id);
peer.asking = PeerAsking::Nothing;
peer.confirmed = true;
true
} else {
trace!(target: "sync", "{}: Fork mismatch", peer_id);
io.disconnect_peer(peer_id);
false
}
},
_ => false,
};
if confirmed {
self.sync_peer(io, peer_id, false);
return Ok(());
}
self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "Ignored unexpected headers");
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
self.continue_sync(io);
return Ok(());
}
@@ -460,14 +508,14 @@ impl ChainSync {
// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
io.disable_peer(peer_id);
}
if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
trace!(target: "sync", "{} Disabled for no data", peer_id);
io.disable_peer(peer_id);
}
match self.state {
SyncState::ChainHead => {
@@ -491,6 +539,9 @@ impl ChainSync {
}
self.collect_blocks(io);
// give a task to the same peer first if received valuable headers.
self.sync_peer(io, peer_id, false);
// give tasks to other peers
self.continue_sync(io);
Ok(())
}
@@ -543,6 +594,11 @@ impl ChainSync {
peer.latest_hash = header.hash();
peer.latest_number = Some(header.number());
}
if self.last_imported_block > header.number() && self.last_imported_block - header.number() > MAX_NEW_BLOCK_AGE {
trace!(target: "sync", "Ignored ancient new block {:?}", h);
io.disable_peer(peer_id);
return Ok(());
}
match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h);
@@ -600,34 +656,39 @@ impl ChainSync {
let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::<H256>(0), item.val_at::<BlockNumber>(1)));
let mut max_height: BlockNumber = 0;
let mut new_hashes = Vec::new();
for (rh, rd) in hashes {
let h = try!(rh);
let d = try!(rd);
if d > self.highest_block.unwrap_or(0) {
self.highest_block = Some(d);
for (rh, rn) in hashes {
let hash = try!(rh);
let number = try!(rn);
if number > self.highest_block.unwrap_or(0) {
self.highest_block = Some(number);
}
if self.blocks.is_downloading(&h) {
if self.blocks.is_downloading(&hash) {
continue;
}
match io.chain().block_status(BlockID::Hash(h.clone())) {
if self.last_imported_block > number && self.last_imported_block - number > MAX_NEW_BLOCK_AGE {
trace!(target: "sync", "Ignored ancient new block hash {:?}", hash);
io.disable_peer(peer_id);
continue;
}
match io.chain().block_status(BlockID::Hash(hash.clone())) {
BlockStatus::InChain => {
trace!(target: "sync", "New block hash already in chain {:?}", h);
trace!(target: "sync", "New block hash already in chain {:?}", hash);
},
BlockStatus::Queued => {
trace!(target: "sync", "New hash block already queued {:?}", h);
trace!(target: "sync", "New hash block already queued {:?}", hash);
},
BlockStatus::Unknown => {
new_hashes.push(h.clone());
if d > max_height {
trace!(target: "sync", "New unknown block hash {:?}", h);
new_hashes.push(hash.clone());
if number > max_height {
trace!(target: "sync", "New unknown block hash {:?}", hash);
let peer = self.peers.get_mut(&peer_id).unwrap();
peer.latest_hash = h.clone();
peer.latest_number = Some(d);
max_height = d;
peer.latest_hash = hash.clone();
peer.latest_number = Some(number);
max_height = number;
}
},
BlockStatus::Bad => {
debug!(target: "sync", "Bad new block hash {:?}", h);
debug!(target: "sync", "Bad new block hash {:?}", hash);
io.disable_peer(peer_id);
return Ok(());
}
@@ -665,7 +726,8 @@ impl ChainSync {
/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
if p.is_available() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers {
@@ -673,7 +735,7 @@ impl ChainSync {
self.sync_peer(io, p, false);
}
}
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing) {
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.is_available()) {
self.complete_sync();
}
}
@@ -699,7 +761,7 @@ impl ChainSync {
}
let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing {
if peer.asking != PeerAsking::Nothing || !peer.is_available() {
return;
}
if self.state == SyncState::Waiting {
@@ -727,7 +789,9 @@ impl ChainSync {
// Request subchain headers
trace!(target: "sync", "Starting sync with better chain");
let last = self.last_imported_hash.clone();
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 1, false, PeerAsking::Heads);
// Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that
// MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains
self.request_headers_by_hash(io, peer_id, &last, SUBCHAIN_SIZE, MAX_HEADERS_TO_REQUEST - 2, false, PeerAsking::Heads);
},
SyncState::Blocks | SyncState::NewBlocks => {
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
@@ -895,6 +959,17 @@ impl ChainSync {
.asking_hash = Some(h.clone());
}
/// Request headers from a peer by block number
#[cfg_attr(feature="dev", allow(too_many_arguments))]
fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool, asking: PeerAsking) {
trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n);
let mut rlp = RlpStream::new_list(4);
rlp.append(&n);
rlp.append(&count);
rlp.append(&skip);
rlp.append(&if reverse {1u32} else {0u32});
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
}
/// Request block bodies from a peer
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
let mut rlp = RlpStream::new_list(hashes.len());
@@ -908,6 +983,7 @@ impl ChainSync {
/// Reset peer status after request is complete.
fn reset_peer_asking(&mut self, peer_id: PeerId, asking: PeerAsking) -> bool {
let peer = self.peers.get_mut(&peer_id).unwrap();
peer.expired = false;
if peer.asking != asking {
trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
peer.asking = PeerAsking::Nothing;
@@ -947,6 +1023,9 @@ impl ChainSync {
if !io.is_chain_queue_empty() {
return Ok(());
}
if self.peers.get(&peer_id).map_or(false, |p| p.confirmed) {
trace!(target: "sync", "{} Ignoring transactions from unconfirmed/unknown peer", peer_id);
}
let mut item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
@@ -1178,7 +1257,13 @@ impl ChainSync {
let tick = time::precise_time_s();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
let timeout = match peer.asking {
PeerAsking::BlockHeaders | PeerAsking::Heads => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
PeerAsking::Nothing => false,
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
};
if timeout {
trace!(target:"sync", "Timeout {}", peer_id);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
@@ -1593,6 +1678,8 @@ mod tests {
asking_blocks: Vec::new(),
asking_hash: None,
ask_time: 0f64,
expired: false,
confirmed: false,
});
sync
}

View File

@@ -72,8 +72,9 @@ use std::ops::*;
use std::sync::*;
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId};
use util::TimerToken;
use util::{U256, ONE_U256};
use util::{U256, ONE_U256, H256};
use ethcore::client::Client;
use ethcore::header::BlockNumber;
use ethcore::service::{SyncMessage, NetSyncMessage};
use io::NetSyncIo;
use util::io::IoChannel;
@@ -93,6 +94,8 @@ pub struct SyncConfig {
pub max_download_ahead_blocks: usize,
/// Network ID
pub network_id: U256,
/// Optional fork block id
pub fork_block: Option<(BlockNumber, H256)>,
}
impl Default for SyncConfig {
@@ -100,6 +103,7 @@ impl Default for SyncConfig {
SyncConfig {
max_download_ahead_blocks: 20000,
network_id: ONE_U256,
fork_block: None,
}
}
}

View File

@@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use util::*;
use ethcore::client::{BlockChainClient, BlockID, EachBlockWith};
use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith};
use chain::{SyncState};
use super::helpers::*;
@@ -95,6 +95,25 @@ fn forked() {
assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain);
}
#[test]
fn net_hard_fork() {
::env_logger::init().ok();
let ref_client = TestBlockChainClient::new();
ref_client.add_blocks(50, EachBlockWith::Uncle);
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 100);
}
{
let mut net = TestNet::new_with_fork(2, Some((50, ref_client.block_hash(BlockID::Number(50)).unwrap())));
net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing);
net.sync();
assert_eq!(net.peer(1).chain.chain_info().best_block_number, 0);
}
}
#[test]
fn restart() {
let mut net = TestNet::new(3);
@@ -108,7 +127,7 @@ fn restart() {
net.restart_peer(0);
let status = net.peer(0).sync.read().unwrap().status();
assert_eq!(status.state, SyncState::ChainHead);
assert_eq!(status.state, SyncState::Idle);
}
#[test]

View File

@@ -16,6 +16,7 @@
use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::header::BlockNumber;
use io::SyncIo;
use chain::ChainSync;
use ::SyncConfig;
@@ -89,13 +90,19 @@ pub struct TestNet {
impl TestNet {
pub fn new(n: usize) -> TestNet {
Self::new_with_fork(n, None)
}
pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet {
let mut net = TestNet {
peers: Vec::new(),
started: false,
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), &chain);
let mut config = SyncConfig::default();
config.fork_block = fork;
let sync = ChainSync::new(config, &chain);
net.peers.push(TestPeer {
sync: RwLock::new(sync),
chain: chain,