v2.6.5 beta (#11240)

* [CI] check evmbin build (#11096)
* Correct EIP-712 encoding (#11092)
* [client]: Fix for incorrectly dropped consensus messages (#11082) (#11086)
* Update hardcoded headers (foundation, classic, kovan, xdai, ewc, ...) (#11053)
* Add cargo-remote dir to .gitignore (?)
* Update light client headers: ropsten 6631425 foundation 8798209 (#11201)
* Update list of bootnodes for xDai chain (#11236)
* ethcore/res: add mordor testnet configuration (#11200)
* [chain specs]: activate Istanbul on mainnet (#11228)
* [builtin]: support multiple prices and activations in chain spec (#11039)
* [receipt]: add sender & receiver to RichReceipts (#11179)
* [ethcore/builtin]: do not panic in blake2pricer on short input (#11180)
* Made ecrecover implementation trait public (#11188)
* Fix docker centos build (#11226)
* Update MIX bootnodes. (#11203)
* Insert explicit warning into the panic hook (#11225)
* Use provided usd-per-eth value if an endpoint is specified (#11209)
* Cleanup stratum a bit (#11161)
* Add Constantinople EIPs to the dev (instant_seal) config (#10809) (already backported)
* util Host: fix a double Read Lock bug in fn Host::session_readable() (#11175)
* ethcore client: fix a double Read Lock bug in fn Client::logs() (#11172)
* Type annotation for next_key() matching of json filter options (#11192)
* Upgrade jsonrpc to latest (#11206)
* [dependencies]: jsonrpc 14.0.1 (#11183)
* Upgrade to jsonrpc v14 (#11151)
* Switching sccache from local to Redis (#10971)
* Snapshot restoration overhaul (#11219)
* Add new line after writing block to hex file. (#10984)
* Pause pruning while snapshotting (#11178)
* Change how RPCs eth_call and eth_estimateGas handle "Pending" (#11127)
* Fix block detail updating (#11015)
* Make InstantSeal Instant again #11186
* Filter out some bad ropsten warp snapshots (#11247)
This commit is contained in:
Talha Cross
2019-11-11 23:55:19 +01:00
committed by s3krit
parent 9838c9f447
commit bc90ba2863
193 changed files with 24263 additions and 3349 deletions

View File

@@ -311,7 +311,7 @@ pub struct EthSync {
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
/// for state tracking
/// Track the sync state: are we importing or verifying blocks?
is_major_syncing: Arc<AtomicBool>
}

View File

@@ -302,7 +302,7 @@ impl BlockDownloader {
}
}
}
// Update the highest block number seen on the network from the header.
if let Some((number, _)) = last_header {
if self.highest_block.as_ref().map_or(true, |n| number > *n) {
self.highest_block = Some(number);

View File

@@ -25,6 +25,7 @@ use ethereum_types::{H256, U256};
use hash::keccak;
use network::PeerId;
use network::client_version::ClientVersion;
use log::{debug, trace, error, warn};
use rlp::Rlp;
use snapshot::ChunkType;
use std::time::Instant;
@@ -85,14 +86,14 @@ impl SyncHandler {
PrivateTransactionPacket => SyncHandler::on_private_transaction(sync, io, peer, &rlp),
SignedPrivateTransactionPacket => SyncHandler::on_signed_private_transaction(sync, io, peer, &rlp),
_ => {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id.id());
Ok(())
}
};
match result {
Err(DownloaderImportError::Invalid) => {
debug!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
trace!(target:"sync", "{} -> Invalid packet {}", peer, packet_id.id());
io.disable_peer(peer);
sync.deactivate_peer(io, peer);
},
@@ -105,7 +106,7 @@ impl SyncHandler {
},
}
} else {
debug!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
trace!(target: "sync", "{}: Unknown packet {}", peer, packet_id);
}
}
@@ -126,14 +127,14 @@ impl SyncHandler {
sync.active_peers.remove(&peer_id);
if sync.state == SyncState::SnapshotManifest {
// Check if we are asking other peers for
// the snapshot manifest as well.
// If not, return to initial state
let still_asking_manifest = sync.peers.iter()
// Check if we are asking other peers for a snapshot manifest as well. If not,
// set our state to initial state (`Idle` or `WaitingPeers`).
let still_seeking_manifest = sync.peers.iter()
.filter(|&(id, p)| sync.active_peers.contains(id) && p.asking == PeerAsking::SnapshotManifest)
.next().is_none();
.next().is_some();
if still_asking_manifest {
if !still_seeking_manifest {
warn!(target: "snapshot_sync", "The peer we were downloading a snapshot from ({}) went away. Retrying.", peer_id);
sync.state = ChainSync::get_init_state(sync.warp_sync, io.chain());
}
}
@@ -380,18 +381,18 @@ impl SyncHandler {
let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks);
if !sync.reset_peer_asking(peer_id, PeerAsking::BlockHeaders) {
debug!(target: "sync", "{}: Ignored unexpected headers", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers", peer_id);
return Ok(());
}
let expected_hash = match expected_hash {
Some(hash) => hash,
None => {
debug!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers (expected_hash is None)", peer_id);
return Ok(());
}
};
if !allowed {
debug!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
trace!(target: "sync", "{}: Ignored unexpected headers (peer not allowed)", peer_id);
return Ok(());
}
@@ -475,12 +476,12 @@ impl SyncHandler {
/// Called when snapshot manifest is downloaded from a peer.
fn on_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
trace!(target: "snapshot_sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
return Ok(());
}
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || sync.state != SyncState::SnapshotManifest {
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
trace!(target: "snapshot_sync", "{}: Ignored unexpected/expired manifest", peer_id);
return Ok(());
}
@@ -491,10 +492,12 @@ impl SyncHandler {
.map_or(false, |(l, h)| manifest.version >= l && manifest.version <= h);
if !is_supported_version {
trace!(target: "sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
warn!(target: "snapshot_sync", "{}: Snapshot manifest version not supported: {}", peer_id, manifest.version);
return Err(DownloaderImportError::Invalid);
}
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
debug!(target: "snapshot_sync", "{}: Peer sent a snapshot manifest we can use. Block number #{}, block chunks: {}, state chunks: {}",
peer_id, manifest.block_number, manifest.block_hashes.len(), manifest.state_hashes.len());
io.snapshot_service().begin_restore(manifest);
sync.state = SyncState::SnapshotData;
@@ -504,12 +507,12 @@ impl SyncHandler {
/// Called when snapshot data is downloaded from a peer.
fn on_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> {
if !sync.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
trace!(target: "snapshot_sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
return Ok(());
}
sync.clear_peer_download(peer_id);
if !sync.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || (sync.state != SyncState::SnapshotData && sync.state != SyncState::SnapshotWaiting) {
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
trace!(target: "snapshot_sync", "{}: Ignored unexpected snapshot data", peer_id);
return Ok(());
}
@@ -517,12 +520,12 @@ impl SyncHandler {
let status = io.snapshot_service().status();
match status {
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration status: {:?}", peer_id, status);
sync.state = SyncState::WaitingPeers;
// only note bad if restoration failed.
if let (Some(hash), RestorationStatus::Failed) = (sync.snapshot.snapshot_hash(), status) {
trace!(target: "sync", "Noting snapshot hash {} as bad", hash);
debug!(target: "snapshot_sync", "Marking snapshot manifest hash {} as bad", hash);
sync.snapshot.note_bad(hash);
}
@@ -530,30 +533,30 @@ impl SyncHandler {
return Ok(());
},
RestorationStatus::Initializing { .. } => {
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration is initializing. Can't accept data right now.", peer_id);
return Ok(());
}
RestorationStatus::Finalizing => {
trace!(target: "warp", "{}: Snapshot finalizing restoration", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot finalizing restoration. Can't accept data right now.", peer_id);
return Ok(());
}
RestorationStatus::Ongoing { .. } => {
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
trace!(target: "snapshot_sync", "{}: Snapshot restoration is ongoing", peer_id);
},
}
let snapshot_data: Bytes = r.val_at(0)?;
match sync.snapshot.validate_chunk(&snapshot_data) {
Ok(ChunkType::Block(hash)) => {
trace!(target: "sync", "{}: Processing block chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Processing block chunk", peer_id);
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
}
Ok(ChunkType::State(hash)) => {
trace!(target: "sync", "{}: Processing state chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Processing state chunk", peer_id);
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
}
Err(()) => {
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
trace!(target: "snapshot_sync", "{}: Got bad snapshot chunk", peer_id);
io.disconnect_peer(peer_id);
return Ok(());
}
@@ -575,7 +578,7 @@ impl SyncHandler {
let warp_protocol = warp_protocol_version != 0;
let private_tx_protocol = warp_protocol_version >= PAR_PROTOCOL_VERSION_3.0;
let peer = PeerInfo {
protocol_version: protocol_version,
protocol_version,
network_id: r.val_at(1)?,
difficulty: Some(r.val_at(2)?),
latest_hash: r.val_at(3)?,
@@ -603,7 +606,8 @@ impl SyncHandler {
latest:{}, \
genesis:{}, \
snapshot:{:?}, \
private_tx_enabled:{})",
private_tx_enabled:{}, \
client_version: {})",
peer_id,
peer.protocol_version,
peer.network_id,
@@ -611,7 +615,8 @@ impl SyncHandler {
peer.latest_hash,
peer.genesis,
peer.snapshot_number,
peer.private_tx_enabled
peer.private_tx_enabled,
peer.client_version,
);
if io.is_expired() {
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_version(peer_id));

View File

@@ -159,18 +159,29 @@ const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
// keep it under 8MB as well, cause it seems that it may result oversized after compression.
const MAX_TRANSACTION_PACKET_SIZE: usize = 5 * 1024 * 1024;
// Min number of blocks to be behind for a snapshot sync
// Min number of blocks to be behind the tip for a snapshot sync to be considered useful to us.
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 30000;
/// We prefer to sync snapshots that are available from this many peers. If we have not found a
/// snapshot available from `SNAPSHOT_MIN_PEERS` peers within `WAIT_PEERS_TIMEOUT`, then we make do
/// with a single peer to sync from.
const SNAPSHOT_MIN_PEERS: usize = 3;
/// To keep memory from growing uncontrollably we restore chunks as we download them and write them
/// to disk only after we have processed them; we also want to avoid pausing the chunk download too
/// often, so we allow a little bit of leeway here and let the downloading be
/// `MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD` chunks ahead of the restoration.
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 5;
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(5);
const STATUS_TIMEOUT: Duration = Duration::from_secs(5);
/// Time to wait for snapshotting peers to show up with a snapshot we want to use. Beyond this time,
/// a single peer is enough to start downloading.
const WAIT_PEERS_TIMEOUT: Duration = Duration::from_secs(10);
/// Time to wait for a peer to start being useful to us in some form. After this they are
/// disconnected.
const STATUS_TIMEOUT: Duration = Duration::from_secs(10);
const HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
const BODIES_TIMEOUT: Duration = Duration::from_secs(20);
const RECEIPTS_TIMEOUT: Duration = Duration::from_secs(10);
const FORK_HEADER_TIMEOUT: Duration = Duration::from_secs(3);
/// Max time to wait for the Snapshot Manifest packet to arrive from a peer after it's being asked.
const SNAPSHOT_MANIFEST_TIMEOUT: Duration = Duration::from_secs(5);
const SNAPSHOT_DATA_TIMEOUT: Duration = Duration::from_secs(120);
@@ -262,7 +273,7 @@ impl SyncStatus {
}
#[derive(PartialEq, Eq, Debug, Clone)]
/// Peer data type requested
/// Peer data type requested from a peer by us.
pub enum PeerAsking {
Nothing,
ForkHeader,
@@ -281,7 +292,7 @@ pub enum BlockSet {
/// Missing old blocks
OldBlocks,
}
#[derive(Clone, Eq, PartialEq)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ForkConfirmation {
/// Fork block confirmation pending.
Unconfirmed,
@@ -291,7 +302,7 @@ pub enum ForkConfirmation {
Confirmed,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
/// Syncing peer information
pub struct PeerInfo {
/// eth protocol version
@@ -304,7 +315,7 @@ pub struct PeerInfo {
latest_hash: H256,
/// Peer total difficulty if known
difficulty: Option<U256>,
/// Type of data currenty being requested from peer.
/// Type of data currently being requested by us from a peer.
asking: PeerAsking,
/// A set of block numbers being requested
asking_blocks: Vec<H256>,
@@ -431,7 +442,8 @@ impl ChainSyncApi {
///
/// NOTE This method should only handle stuff that can be canceled and would reach other peers
/// by other means.
pub fn process_priority_queue(&self, io: &mut SyncIo) {
/// Called every `PRIORITY_TIMER` (0.25sec)
pub fn process_priority_queue(&self, io: &mut dyn SyncIo) {
fn check_deadline(deadline: Instant) -> Option<Duration> {
let now = Instant::now();
if now > deadline {
@@ -565,12 +577,26 @@ impl ChainSync {
peers
}
fn get_init_state(warp_sync: WarpSync, chain: &BlockChainClient) -> SyncState {
/// Reset the client to its initial state:
/// - if warp sync is enabled, start looking for peers to sync a snapshot from
/// - if `--warp-barrier` is used, ensure we're not synced beyond the barrier and start
/// looking for peers to sync a snapshot from
/// - otherwise, go `Idle`.
fn get_init_state(warp_sync: WarpSync, chain: &dyn BlockChainClient) -> SyncState {
let best_block = chain.chain_info().best_block_number;
match warp_sync {
WarpSync::Enabled => SyncState::WaitingPeers,
WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers,
_ => SyncState::Idle,
WarpSync::Enabled => {
debug!(target: "sync", "Setting the initial state to `WaitingPeers`. Our best block: #{}; warp_sync: {:?}", best_block, warp_sync);
SyncState::WaitingPeers
},
WarpSync::OnlyAndAfter(block) if block > best_block => {
debug!(target: "sync", "Setting the initial state to `WaitingPeers`. Our best block: #{}; warp_sync: {:?}", best_block, warp_sync);
SyncState::WaitingPeers
},
_ => {
debug!(target: "sync", "Setting the initial state to `Idle`. Our best block: #{}", best_block);
SyncState::Idle
},
}
}
}
@@ -591,7 +617,7 @@ pub struct ChainSync {
state: SyncState,
/// Last block number for the start of sync
starting_block: BlockNumber,
/// Highest block number seen
/// Highest block number seen on the network.
highest_block: Option<BlockNumber>,
/// All connected peers
peers: Peers,
@@ -663,7 +689,7 @@ impl ChainSync {
sync
}
/// Returns synchonization status
/// Returns synchronization status
pub fn status(&self) -> SyncStatus {
let last_imported_number = self.new_blocks.last_imported_block_number();
SyncStatus {
@@ -721,7 +747,7 @@ impl ChainSync {
receiver
}
/// notify all subscibers of a new SyncState
/// Notify all subscribers of a new SyncState
fn notify_sync_state(&mut self, state: SyncState) {
// remove any sender whose receiving end has been dropped
self.status_sinks.retain(|sender| {
@@ -741,7 +767,7 @@ impl ChainSync {
fn reset(&mut self, io: &mut SyncIo, state: Option<SyncState>) {
self.new_blocks.reset();
let chain_info = io.chain().chain_info();
for (_, ref mut p) in &mut self.peers {
for (_, mut p) in &mut self.peers {
if p.block_set != Some(BlockSet::OldBlocks) {
p.reset_asking();
if p.difficulty.is_none() {
@@ -763,28 +789,30 @@ impl ChainSync {
pub fn reset_and_continue(&mut self, io: &mut SyncIo) {
trace!(target: "sync", "Restarting");
if self.state == SyncState::SnapshotData {
debug!(target:"sync", "Aborting snapshot restore");
debug!(target:"snapshot_sync", "Aborting snapshot restore");
io.snapshot_service().abort_restore();
}
self.snapshot.clear();
// Passing `None` here means we'll end up in either `SnapshotWaiting` or `Idle` depending on
// the warp sync settings.
self.reset(io, None);
self.continue_sync(io);
}
/// Remove peer from active peer set. Peer will be reactivated on the next sync
/// round.
fn deactivate_peer(&mut self, _io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "Deactivating peer {}", peer_id);
fn deactivate_peer(&mut self, _io: &mut dyn SyncIo, peer_id: PeerId) {
debug!(target: "sync", "Deactivating peer {}", peer_id);
self.active_peers.remove(&peer_id);
}
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
/// Decide if we should start downloading a snapshot and from who. Called once per second.
fn maybe_start_snapshot_sync(&mut self, io: &mut dyn SyncIo) {
if !self.warp_sync.is_enabled() || io.snapshot_service().supported_versions().is_none() {
trace!(target: "sync", "Skipping warp sync. Disabled or not supported.");
return;
}
if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting {
trace!(target: "sync", "Skipping warp sync. State: {:?}", self.state);
use SyncState::*;
if self.state != WaitingPeers && self.state != Blocks && self.state != Waiting {
return;
}
// Make sure the snapshot block is not too far away from best block and network best block and
@@ -792,71 +820,112 @@ impl ChainSync {
let our_best_block = io.chain().chain_info().best_block_number;
let fork_block = self.fork_block.map_or(0, |(n, _)| n);
let (best_hash, max_peers, snapshot_peers) = {
let expected_warp_block = match self.warp_sync {
WarpSync::OnlyAndAfter(block) => block,
_ => 0,
};
//collect snapshot infos from peers
let snapshots = self.peers.iter()
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn|
// Snapshot must be old enough that it's usefull to sync with it
our_best_block < sn && (sn - our_best_block) > SNAPSHOT_RESTORE_THRESHOLD &&
// Snapshot must have been taken after the Fork
sn > fork_block &&
// Snapshot must be greater than the warp barrier if any
sn > expected_warp_block &&
// If we know a highest block, snapshot must be recent enough
self.highest_block.map_or(true, |highest| {
highest < sn || (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD
})
))
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())))
.filter(|&(_, ref hash)| !self.snapshot.is_known_bad(hash));
let expected_warp_block = match self.warp_sync {
WarpSync::OnlyAndAfter(warp_block) => {
if our_best_block >= warp_block {
trace!(target: "snapshot_sync",
"Our best block (#{}) is already beyond the warp barrier block (#{})",
our_best_block, warp_block);
return;
}
warp_block
},
_ => 0,
};
// Collect snapshot info from peers and check if we can use their snapshots to sync.
let (best_snapshot_block, best_hash, max_peers, snapshot_peers) = {
let mut snapshots = self.peers.iter()
.filter(|&(_, p)|
// filter out expired peers and peers from whom we do not have fork confirmation.
p.is_allowed() &&
p.snapshot_number.map_or(false, |sn|
// Snapshot must be sufficiently better than what we have that it's useful to
// sync with it: more than 30k blocks beyond our best block
our_best_block < sn && (sn - our_best_block) > SNAPSHOT_RESTORE_THRESHOLD &&
// Snapshot must have been taken after the fork block (if any is configured)
sn > fork_block &&
// Snapshot must be greater or equal to the warp barrier, if any
sn >= expected_warp_block
)
)
.filter_map(|(p, peer)| {
peer.snapshot_hash.map(|hash| (p, hash))
.filter(|(_, hash)| !self.snapshot.is_known_bad(&hash) )
.and_then(|(p, hash)| peer.snapshot_number.map(|n| (*p, n, hash) ) )
})
.collect::<Vec<(PeerId, BlockNumber, H256)>>();
// Sort collection of peers by highest block number.
snapshots.sort_by(|&(_, ref b1, _), &(_, ref b2, _)| b2.cmp(b1) );
let mut snapshot_peers = HashMap::new();
let mut max_peers: usize = 0;
let mut best_hash = None;
for (p, hash) in snapshots {
let mut best_snapshot_block = None;
// Of the available snapshots, find the one seeded by the most peers. On a tie, the
// snapshot closest to the tip will be used (unfortunately this is the common case).
for (p, snapshot_block, hash) in snapshots {
let peers = snapshot_peers.entry(hash).or_insert_with(Vec::new);
peers.push(*p);
peers.push(p);
if peers.len() > max_peers {
trace!(target: "snapshot_sync", "{} is the new best snapshotting peer, has snapshot at block #{}/{}", p, snapshot_block, hash);
max_peers = peers.len();
best_hash = Some(hash);
best_snapshot_block = Some(snapshot_block);
}
}
(best_hash, max_peers, snapshot_peers)
(best_snapshot_block, best_hash, max_peers, snapshot_peers)
};
// If we've waited long enough (10sec), a single peer will have to be enough for the snapshot sync to start.
let timeout = (self.state == WaitingPeers) &&
self.sync_start_time.map_or(false, |t| t.elapsed() > WAIT_PEERS_TIMEOUT);
let timeout = (self.state == SyncState::WaitingPeers) && self.sync_start_time.map_or(false, |t| t.elapsed() > WAIT_PEERS_TIMEOUT);
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
if let (Some(block), Some(hash), Some(peers)) = (
best_snapshot_block,
best_hash,
best_hash.map_or(None, |h| snapshot_peers.get(&h))
) {
trace!(target: "snapshot_sync", "We can sync a snapshot at #{:?}/{:?} from {} peer(s): {:?}",
best_snapshot_block, best_hash, max_peers, snapshot_peers.values());
if max_peers >= SNAPSHOT_MIN_PEERS {
trace!(target: "sync", "Starting confirmed snapshot sync {:?} with {:?}", hash, peers);
debug!(target: "snapshot_sync", "Starting confirmed snapshot sync for a snapshot at #{}/{:?} with peer {:?}", block, hash, peers);
self.start_snapshot_sync(io, peers);
} else if timeout {
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
debug!(target: "snapshot_sync", "Starting unconfirmed snapshot sync for a snapshot at #{}/{:?} with peer {:?}", block, hash, peers);
self.start_snapshot_sync(io, peers);
} else {
trace!(target: "snapshot_sync", "Waiting a little more to let more snapshot peers connect.")
}
} else if timeout {
if !self.warp_sync.is_warp_only() {
debug!(target: "snapshot_sync", "Not syncing snapshots (or none found), proceeding with normal sync.");
self.set_state(SyncState::Idle);
self.continue_sync(io);
} else {
warn!(target: "snapshot_sync", "No snapshots currently available at #{}. Try using a smaller value for --warp-barrier", expected_warp_block);
}
} else if timeout && !self.warp_sync.is_warp_only() {
trace!(target: "sync", "No snapshots found, starting full sync");
self.set_state(SyncState::Idle);
self.continue_sync(io);
}
}
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
/// Start a snapshot with all peers that we are not currently asking something else from. If
/// we're already snapshotting with a peer, set sync state to `SnapshotData` and continue
/// fetching the snapshot. Note that we only ever sync snapshots from one peer so here we send
/// out the request for a manifest to all the peers that have it and start syncing the snapshot
/// with the first that responds.
fn start_snapshot_sync(&mut self, io: &mut dyn SyncIo, peers: &[PeerId]) {
if !self.snapshot.have_manifest() {
for p in peers {
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
// When we get a response we call `SyncHandler::on_snapshot_manifest`
SyncRequester::request_snapshot_manifest(self, io, *p);
}
}
self.set_state(SyncState::SnapshotManifest);
trace!(target: "sync", "New snapshot sync with {:?}", peers);
trace!(target: "snapshot_sync", "New snapshot sync with {:?}", peers);
} else {
self.set_state(SyncState::SnapshotData);
trace!(target: "sync", "Resumed snapshot sync with {:?}", peers);
trace!(target: "snapshot_sync", "Resumed snapshot sync with {:?}", peers);
}
}
@@ -886,8 +955,9 @@ impl ChainSync {
}
}
/// Resume downloading
pub fn continue_sync(&mut self, io: &mut SyncIo) {
/// Resume downloading.
/// Called every `CONTINUE_SYNC_TIMER` (2.5sec)
pub fn continue_sync(&mut self, io: &mut dyn SyncIo) {
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
@@ -903,7 +973,7 @@ impl ChainSync {
).collect();
if peers.len() > 0 {
trace!(
debug!(
target: "sync",
"Syncing with peers: {} active, {} available, {} total",
self.active_peers.len(), peers.len(), self.peers.len()
@@ -919,9 +989,8 @@ impl ChainSync {
}
}
if
(self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) &&
!self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync())
if (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks)
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync())
{
self.complete_sync(io);
}
@@ -963,13 +1032,14 @@ impl ChainSync {
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
if force || higher_difficulty || self.old_blocks.is_some() {
match self.state {
SyncState::WaitingPeers => {
SyncState::WaitingPeers if peer_snapshot_number > 0 => {
trace!(
target: "sync",
"Checking snapshot sync: {} vs {} (peer: {})",
target: "snapshot_sync",
"{}: Potential snapshot sync peer; their highest block: #{} vs our highest: #{} (peer: {})",
peer_id,
peer_snapshot_number,
chain_info.best_block_number,
peer_id
io.peer_enode(peer_id).unwrap_or_else(|| "enode://???".to_string()),
);
self.maybe_start_snapshot_sync(io);
},
@@ -1014,17 +1084,18 @@ impl ChainSync {
},
SyncState::SnapshotData => {
match io.snapshot_service().status() {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, state_chunks, block_chunks } => {
// Initialize the snapshot if not already done
self.snapshot.initialize(io.snapshot_service());
self.snapshot.initialize(io.snapshot_service(), block_chunks as usize + state_chunks as usize);
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
trace!(target: "snapshot_sync", "Snapshot queue full, pausing sync");
self.set_state(SyncState::SnapshotWaiting);
return;
}
},
RestorationStatus::Initializing { .. } => {
trace!(target: "warp", "Snapshot is stil initializing.");
RestorationStatus::Initializing { state_chunks, block_chunks, chunks_done } => {
debug!(target: "snapshot_sync", "Snapshot is initializing: state chunks={}, block chunks={}, chunks done={}",
state_chunks, block_chunks, chunks_done);
return;
},
_ => {
@@ -1039,16 +1110,17 @@ impl ChainSync {
},
SyncState::SnapshotManifest | //already downloading from other peer
SyncState::Waiting |
SyncState::SnapshotWaiting => ()
SyncState::SnapshotWaiting => (),
_ => ()
}
} else {
trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
}
}
/// Clear all blocks/headers marked as being downloaded by a peer.
/// Clear all blocks/headers marked as being downloaded by us from a peer.
fn clear_peer_download(&mut self, peer_id: PeerId) {
if let Some(ref peer) = self.peers.get(&peer_id) {
if let Some(peer) = self.peers.get(&peer_id) {
match peer.asking {
PeerAsking::BlockHeaders => {
if let Some(ref hash) = peer.asking_hash {
@@ -1126,7 +1198,7 @@ impl ChainSync {
peer.expired = false;
peer.block_set = None;
if peer.asking != asking {
trace!(target:"sync", "Asking {:?} while expected {:?}", peer.asking, asking);
trace!(target:"sync", "{}: Asking {:?} while expected {:?}", peer_id, peer.asking, asking);
peer.asking = PeerAsking::Nothing;
return false;
} else {
@@ -1166,7 +1238,10 @@ impl ChainSync {
io.respond(StatusPacket.id(), packet.out())
}
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
/// Check if any tasks we have on-going with a peer is taking too long (if so, disconnect them).
/// Also checks handshaking peers.
/// Called every `PEERS_TIMER` (0.7sec).
pub fn maintain_peers(&mut self, io: &mut dyn SyncIo) {
let tick = Instant::now();
let mut aborting = Vec::new();
for (peer_id, peer) in &self.peers {
@@ -1181,7 +1256,7 @@ impl ChainSync {
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT,
};
if timeout {
debug!(target:"sync", "Timeout {}", peer_id);
debug!(target:"sync", "Peer {} timeout while we were asking them for {:?}; disconnecting.", peer_id, peer.asking);
io.disconnect_peer(*peer_id);
aborting.push(*peer_id);
}
@@ -1215,24 +1290,24 @@ impl ChainSync {
SyncState::SnapshotWaiting => {
match io.snapshot_service().status() {
RestorationStatus::Inactive => {
trace!(target:"sync", "Snapshot restoration is complete");
trace!(target:"snapshot_sync", "Snapshot restoration is complete");
self.restart(io);
},
RestorationStatus::Initializing { .. } => {
trace!(target:"sync", "Snapshot restoration is initializing");
trace!(target:"snapshot_sync", "Snapshot restoration is initializing");
},
RestorationStatus::Finalizing { .. } => {
trace!(target:"sync", "Snapshot finalizing restoration");
trace!(target:"snapshot_sync", "Snapshot finalizing restoration");
},
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
trace!(target:"snapshot_sync", "Resuming snapshot sync");
self.set_state(SyncState::SnapshotData);
self.continue_sync(io);
}
},
RestorationStatus::Failed => {
trace!(target: "sync", "Snapshot restoration aborted");
trace!(target: "snapshot_sync", "Snapshot restoration aborted");
self.set_state(SyncState::WaitingPeers);
self.snapshot.clear();
self.continue_sync(io);
@@ -1285,8 +1360,9 @@ impl ChainSync {
).collect()
}
/// Maintain other peers. Send out any new blocks and transactions
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
/// Maintain other peers. Send out any new blocks and transactions. Called every
/// `MAINTAIN_SYNC_TIMER` (1.1sec).
pub fn maintain_sync(&mut self, io: &mut dyn SyncIo) {
self.maybe_start_snapshot_sync(io);
self.check_resume(io);
}
@@ -1332,8 +1408,9 @@ impl ChainSync {
SyncHandler::on_peer_connected(self, io, peer);
}
/// propagates new transactions to all peers
pub fn propagate_new_transactions(&mut self, io: &mut SyncIo) {
/// Propagates new transactions to all peers.
/// Called every `TX_TIMER` (1.3sec).
pub fn propagate_new_transactions(&mut self, io: &mut dyn SyncIo) {
let deadline = Instant::now() + Duration::from_millis(500);
SyncPropagator::propagate_new_transactions(self, io, || {
if deadline > Instant::now() {

View File

@@ -81,11 +81,11 @@ impl SyncRequester {
SyncRequester::send_request(sync, io, peer_id, PeerAsking::ForkHeader, GetBlockHeadersPacket, rlp.out());
}
/// Find some headers or blocks to download for a peer.
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
/// Find some headers or blocks to download from a peer.
pub fn request_snapshot_data(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
// find chunk data to download
if let Some(hash) = sync.snapshot.needed_chunk() {
if let Some(ref mut peer) = sync.peers.get_mut(&peer_id) {
if let Some(mut peer) = sync.peers.get_mut(&peer_id) {
peer.asking_snapshot_data = Some(hash.clone());
}
SyncRequester::request_snapshot_chunk(sync, io, peer_id, &hash);
@@ -93,10 +93,9 @@ impl SyncRequester {
}
/// Request snapshot manifest from a peer.
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId) {
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
let rlp = RlpStream::new_list(0);
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp.out());
pub fn request_snapshot_manifest(sync: &mut ChainSync, io: &mut dyn SyncIo, peer_id: PeerId) {
trace!(target: "sync", "{}: requesting a snapshot manifest", peer_id);
SyncRequester::send_request(sync, io, peer_id, PeerAsking::SnapshotManifest, GetSnapshotManifestPacket, rlp::EMPTY_LIST_RLP.to_vec());
}
/// Request headers from a peer by block hash

View File

@@ -108,7 +108,7 @@ impl SyncSupplier {
debug!(target:"sync", "Unexpected packet {} from unregistered peer: {}:{}", packet_id, peer, io.peer_version(peer));
return;
}
debug!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
trace!(target: "sync", "{} -> Dispatching packet: {}", peer, packet_id);
match id {
ConsensusDataPacket => {

View File

@@ -30,6 +30,7 @@ extern crate ethereum_types;
extern crate ethkey;
extern crate ethstore;
extern crate fastmap;
extern crate indexmap;
extern crate keccak_hash as hash;
extern crate parity_bytes as bytes;
extern crate parity_runtime;

View File

@@ -21,39 +21,55 @@ use hash::keccak;
use std::collections::HashSet;
use std::iter::FromIterator;
use indexmap::IndexSet;
#[derive(PartialEq, Eq, Debug)]
/// The type of data contained in a chunk: state or block.
pub enum ChunkType {
/// The chunk contains state data (aka account data).
State(H256),
/// The chunk contains block data.
Block(H256),
}
#[derive(MallocSizeOf)]
#[derive(Default, MallocSizeOf)]
pub struct Snapshot {
pending_state_chunks: Vec<H256>,
pending_block_chunks: Vec<H256>,
/// List of hashes of the state chunks we need to complete the warp sync from this snapshot.
/// These hashes are contained in the Manifest we downloaded from the peer(s).
/// Note: this is an ordered set so that state restoration happens in order, which keeps
/// memory usage down.
// See https://github.com/paritytech/parity-common/issues/255
#[ignore_malloc_size_of = "no impl for IndexSet (yet)"]
pending_state_chunks: IndexSet<H256>,
/// List of hashes of the block chunks we need to complete the warp sync from this snapshot.
/// These hashes are contained in the Manifest we downloaded from the peer(s).
/// Note: this is an ordered set so that state restoration happens in order, which keeps
/// memory usage down.
// See https://github.com/paritytech/parity-common/issues/255
#[ignore_malloc_size_of = "no impl for IndexSet (yet)"]
pending_block_chunks: IndexSet<H256>,
/// Set of hashes of chunks we are currently downloading.
downloading_chunks: HashSet<H256>,
/// The set of chunks (block or state) that we have successfully downloaded.
completed_chunks: HashSet<H256>,
/// The hash of the the `ManifestData` RLP that we're downloading.
snapshot_hash: Option<H256>,
/// Total number of chunks in the current snapshot.
total_chunks: Option<usize>,
/// Set of snapshot hashes we failed to import. We will not try to sync with
/// this snapshot again until restart.
bad_hashes: HashSet<H256>,
initialized: bool,
}
impl Snapshot {
/// Create a new instance.
pub fn new() -> Snapshot {
Snapshot {
pending_state_chunks: Vec::new(),
pending_block_chunks: Vec::new(),
downloading_chunks: HashSet::new(),
completed_chunks: HashSet::new(),
snapshot_hash: None,
bad_hashes: HashSet::new(),
initialized: false,
}
pub fn new() -> Self {
Default::default()
}
/// Sync the Snapshot completed chunks with the Snapshot Service
pub fn initialize(&mut self, snapshot_service: &SnapshotService) {
pub fn initialize(&mut self, snapshot_service: &dyn SnapshotService, total_chunks: usize) {
if self.initialized {
return;
}
@@ -63,111 +79,122 @@ impl Snapshot {
}
trace!(
target: "snapshot",
"Snapshot is now initialized with {} completed chunks.",
self.completed_chunks.len(),
target: "snapshot_sync",
"Snapshot initialized. {}/{} completed chunks.",
self.completed_chunks.len(), total_chunks
);
self.total_chunks = Some(total_chunks);
self.initialized = true;
}
/// Clear everything.
/// Clear everything and set `initialized` to false.
pub fn clear(&mut self) {
self.pending_state_chunks.clear();
self.pending_block_chunks.clear();
self.downloading_chunks.clear();
self.completed_chunks.clear();
self.snapshot_hash = None;
self.total_chunks = None;
self.initialized = false;
}
/// Check if currently downloading a snapshot.
/// Check if we're currently downloading a snapshot.
pub fn have_manifest(&self) -> bool {
self.snapshot_hash.is_some()
}
/// Reset collection for a manifest RLP
/// Clear the `Snapshot` and reset it with data from a `ManifestData` (i.e. the lists of
/// block&state chunk hashes contained in the `ManifestData`).
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
self.clear();
self.pending_state_chunks = manifest.state_hashes.clone();
self.pending_block_chunks = manifest.block_hashes.clone();
self.pending_state_chunks = IndexSet::from_iter(manifest.state_hashes.clone());
self.pending_block_chunks = IndexSet::from_iter(manifest.block_hashes.clone());
self.total_chunks = Some(self.pending_block_chunks.len() + self.pending_state_chunks.len());
self.snapshot_hash = Some(hash.clone());
}
/// Validate chunk and mark it as downloaded
/// Check if the the chunk is known, i.e. downloaded already or currently downloading; if so add
/// it to the `completed_chunks` set.
/// Returns a `ChunkType` with the hash of the chunk.
pub fn validate_chunk(&mut self, chunk: &[u8]) -> Result<ChunkType, ()> {
let hash = keccak(chunk);
if self.completed_chunks.contains(&hash) {
trace!(target: "sync", "Ignored proccessed chunk: {:x}", hash);
trace!(target: "snapshot_sync", "Already proccessed chunk {:x}. Ignoring.", hash);
return Err(());
}
self.downloading_chunks.remove(&hash);
if self.pending_block_chunks.iter().any(|h| h == &hash) {
self.completed_chunks.insert(hash.clone());
return Ok(ChunkType::Block(hash));
}
if self.pending_state_chunks.iter().any(|h| h == &hash) {
self.completed_chunks.insert(hash.clone());
return Ok(ChunkType::State(hash));
}
trace!(target: "sync", "Ignored unknown chunk: {:x}", hash);
Err(())
self.pending_block_chunks.take(&hash)
.and_then(|h| {
self.completed_chunks.insert(h);
Some(ChunkType::Block(hash))
})
.or(
self.pending_state_chunks.take(&hash)
.and_then(|h| {
self.completed_chunks.insert(h);
Some(ChunkType::State(hash))
})
).ok_or_else(|| {
trace!(target: "snapshot_sync", "Ignoring unknown chunk: {:x}", hash);
()
})
}
/// Find a chunk to download
/// Pick a chunk to download.
/// Note: the order in which chunks are processed is somewhat important. The account state
/// sometimes spills over into more than one chunk and the parts of state that are missing
/// pieces are held in memory while waiting for the next chunk(s) to show up. This means that
/// when chunks are processed out-of-order, memory usage goes up, sometimes significantly (see
/// e.g. https://github.com/paritytech/parity-ethereum/issues/8825).
pub fn needed_chunk(&mut self) -> Option<H256> {
// Find next needed chunk: first block, then state chunks
let chunk = {
let chunk_filter = |h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h);
let needed_block_chunk = self.pending_block_chunks.iter()
.filter(|&h| chunk_filter(h))
let filter = |h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h);
self.pending_block_chunks.iter()
.find(|&h| filter(h))
.or(self.pending_state_chunks.iter()
.find(|&h| filter(h))
)
.map(|h| *h)
.next();
// If no block chunks to download, get the state chunks
if needed_block_chunk.is_none() {
self.pending_state_chunks.iter()
.filter(|&h| chunk_filter(h))
.map(|h| *h)
.next()
} else {
needed_block_chunk
}
};
if let Some(hash) = chunk {
self.downloading_chunks.insert(hash.clone());
}
chunk
}
/// Remove a chunk from the set of chunks we're interested in downloading.
pub fn clear_chunk_download(&mut self, hash: &H256) {
self.downloading_chunks.remove(hash);
}
// note snapshot hash as bad.
/// Mark a snapshot hash as bad.
pub fn note_bad(&mut self, hash: H256) {
self.bad_hashes.insert(hash);
}
// whether snapshot hash is known to be bad.
/// Whether a snapshot hash is known to be bad.
pub fn is_known_bad(&self, hash: &H256) -> bool {
self.bad_hashes.contains(hash)
}
/// Hash of the snapshot we're currently downloading/importing.
pub fn snapshot_hash(&self) -> Option<H256> {
self.snapshot_hash
}
/// Total number of chunks in the snapshot we're currently working on (state + block chunks).
pub fn total_chunks(&self) -> usize {
self.pending_block_chunks.len() + self.pending_state_chunks.len()
self.total_chunks.unwrap_or_default()
}
/// Number of chunks we've processed so far (state and block chunks).
pub fn done_chunks(&self) -> usize {
self.completed_chunks.len()
}
/// Are we done downloading all chunks?
pub fn is_complete(&self) -> bool {
self.total_chunks() == self.completed_chunks.len()
}
@@ -219,25 +246,30 @@ mod test {
let mut snapshot = Snapshot::new();
let (manifest, mhash, state_chunks, block_chunks) = test_manifest();
snapshot.reset_to(&manifest, &mhash);
assert_eq!(snapshot.done_chunks(), 0);
assert!(snapshot.validate_chunk(&H256::random().as_bytes().to_vec()).is_err());
assert_eq!(snapshot.done_chunks(), 0, "no chunks done at outset");
assert!(snapshot.validate_chunk(&H256::random().as_bytes().to_vec()).is_err(), "random chunk is invalid");
// request all 20 + 20 chunks
let requested: Vec<H256> = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect();
assert!(snapshot.needed_chunk().is_none());
assert!(snapshot.needed_chunk().is_none(), "no chunks left after all are drained");
let requested_all_block_chunks = manifest.block_hashes.iter()
.all(|h| requested.iter().any(|rh| rh == h));
assert!(requested_all_block_chunks);
assert!(requested_all_block_chunks, "all block chunks in the manifest accounted for");
let requested_all_state_chunks = manifest.state_hashes.iter()
.all(|h| requested.iter().any(|rh| rh == h));
assert!(requested_all_state_chunks);
assert!(requested_all_state_chunks, "all state chunks in the manifest accounted for");
assert_eq!(snapshot.downloading_chunks.len(), 40);
assert_eq!(snapshot.downloading_chunks.len(), 40, "all requested chunks are downloading");
assert_eq!(snapshot.validate_chunk(&state_chunks[4]), Ok(ChunkType::State(manifest.state_hashes[4].clone())));
assert_eq!(snapshot.completed_chunks.len(), 1);
assert_eq!(snapshot.downloading_chunks.len(), 39);
assert_eq!(
snapshot.validate_chunk(&state_chunks[4]),
Ok(ChunkType::State(manifest.state_hashes[4].clone())),
"4th state chunk hash validates as such"
);
assert_eq!(snapshot.completed_chunks.len(), 1, "after validating a chunk, it's in the completed set");
assert_eq!(snapshot.downloading_chunks.len(), 39, "after validating a chunk, there's one less in the downloading set");
assert_eq!(snapshot.validate_chunk(&block_chunks[10]), Ok(ChunkType::Block(manifest.block_hashes[10].clone())));
assert_eq!(snapshot.completed_chunks.len(), 2);
@@ -255,7 +287,7 @@ mod test {
}
}
assert!(snapshot.is_complete());
assert!(snapshot.is_complete(), "when all chunks have been validated, we're done");
assert_eq!(snapshot.done_chunks(), 40);
assert_eq!(snapshot.done_chunks(), snapshot.total_chunks());
assert_eq!(snapshot.snapshot_hash(), Some(keccak(manifest.into_rlp())));

View File

@@ -44,6 +44,8 @@ pub trait SyncIo {
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
ClientVersion::from(peer_id.to_string())
}
/// Returns the peer enode string
fn peer_enode(&self, peer_id: PeerId) -> Option<String>;
/// Returns information on p2p session
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>;
/// Maximum mutually supported ETH protocol version
@@ -106,22 +108,26 @@ impl<'s> SyncIo for NetSyncIo<'s> {
self.chain
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
self.chain_overlay
fn snapshot_service(&self) -> &dyn SnapshotService {
self.snapshot_service
}
fn snapshot_service(&self) -> &SnapshotService {
self.snapshot_service
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
self.network.peer_client_version(peer_id)
}
fn peer_enode(&self, peer_id: PeerId) -> Option<String> {
self.network.session_info(peer_id).and_then(|info| {
info.id.map(|node_id| {
format!("enode:://{}@{}", node_id, info.remote_address)
})
})
}
fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> {
self.network.session_info(peer_id)
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn eth_protocol_version(&self, peer_id: PeerId) -> u8 {
self.network.protocol_version(self.network.subprotocol_name(), peer_id).unwrap_or(0)
}
@@ -130,8 +136,12 @@ impl<'s> SyncIo for NetSyncIo<'s> {
self.network.protocol_version(*protocol, peer_id).unwrap_or(0)
}
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
self.network.peer_client_version(peer_id)
fn is_expired(&self) -> bool {
self.network.is_expired()
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
self.chain_overlay
}
fn payload_soft_limit(&self) -> usize {

View File

@@ -92,25 +92,17 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
self.to_disconnect.insert(peer_id);
}
fn is_expired(&self) -> bool {
false
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), network::Error> {
self.packets.push(TestPacket {
data: data,
packet_id: packet_id,
recipient: self.sender.unwrap()
});
self.packets.push(
TestPacket { data, packet_id, recipient: self.sender.unwrap() }
);
Ok(())
}
fn send(&mut self,peer_id: PeerId, packet_id: SyncPacket, data: Vec<u8>) -> Result<(), network::Error> {
self.packets.push(TestPacket {
data: data,
packet_id: packet_id.id(),
recipient: peer_id,
});
self.packets.push(
TestPacket { data, packet_id: packet_id.id(), recipient: peer_id }
);
Ok(())
}
@@ -118,16 +110,19 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
&*self.chain
}
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
let client_id = self.peers_info.get(&peer_id)
.cloned()
.unwrap_or_else(|| peer_id.to_string());
ClientVersion::from(client_id)
fn snapshot_service(&self) -> &dyn SnapshotService {
self.snapshot_service
}
fn snapshot_service(&self) -> &SnapshotService {
self.snapshot_service
fn peer_version(&self, peer_id: PeerId) -> ClientVersion {
self.peers_info.get(&peer_id)
.cloned()
.unwrap_or_else(|| peer_id.to_string())
.into()
}
fn peer_enode(&self, _peer_id: usize) -> Option<String> {
unimplemented!()
}
fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> {
@@ -142,6 +137,10 @@ impl<'p, C> SyncIo for TestIo<'p, C> where C: FlushingBlockChainClient, C: 'p {
if protocol == &WARP_SYNC_PROTOCOL_ID { PAR_PROTOCOL_VERSION_3.0 } else { self.eth_protocol_version(peer_id) }
}
fn is_expired(&self) -> bool {
false
}
fn chain_overlay(&self) -> &RwLock<HashMap<BlockNumber, Bytes>> {
&self.overlay
}