|
|
|
|
@@ -123,6 +123,7 @@ const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
|
|
|
|
|
const MAX_TRANSACTION_SIZE: usize = 300*1024;
|
|
|
|
|
// Min number of blocks to be behind for a snapshot sync
|
|
|
|
|
const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 100000;
|
|
|
|
|
const SNAPSHOT_MIN_PEERS: usize = 3;
|
|
|
|
|
|
|
|
|
|
const STATUS_PACKET: u8 = 0x00;
|
|
|
|
|
const NEW_BLOCK_HASHES_PACKET: u8 = 0x01;
|
|
|
|
|
@@ -147,21 +148,27 @@ const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
|
|
|
|
|
|
|
|
|
pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15;
|
|
|
|
|
|
|
|
|
|
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
|
|
|
|
|
const BODIES_TIMEOUT_SEC: f64 = 10f64;
|
|
|
|
|
const RECEIPTS_TIMEOUT_SEC: f64 = 10f64;
|
|
|
|
|
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
|
|
|
|
|
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
|
|
|
|
|
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 60f64;
|
|
|
|
|
const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3;
|
|
|
|
|
|
|
|
|
|
const WAIT_PEERS_TIMEOUT_SEC: u64 = 5;
|
|
|
|
|
const STATUS_TIMEOUT_SEC: u64 = 5;
|
|
|
|
|
const HEADERS_TIMEOUT_SEC: u64 = 15;
|
|
|
|
|
const BODIES_TIMEOUT_SEC: u64 = 10;
|
|
|
|
|
const RECEIPTS_TIMEOUT_SEC: u64 = 10;
|
|
|
|
|
const FORK_HEADER_TIMEOUT_SEC: u64 = 3;
|
|
|
|
|
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3;
|
|
|
|
|
const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60;
|
|
|
|
|
|
|
|
|
|
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
|
|
|
|
/// Sync state
|
|
|
|
|
pub enum SyncState {
|
|
|
|
|
/// Waiting for pv64 peers to start snapshot syncing
|
|
|
|
|
/// Collecting enough peers to start syncing.
|
|
|
|
|
WaitingPeers,
|
|
|
|
|
/// Waiting for snapshot manifest download
|
|
|
|
|
SnapshotManifest,
|
|
|
|
|
/// Downloading snapshot data
|
|
|
|
|
SnapshotData,
|
|
|
|
|
/// Waiting for snapshot restoration to complete
|
|
|
|
|
/// Waiting for snapshot restoration progress.
|
|
|
|
|
SnapshotWaiting,
|
|
|
|
|
/// Downloading new blocks
|
|
|
|
|
Blocks,
|
|
|
|
|
@@ -276,7 +283,7 @@ struct PeerInfo {
|
|
|
|
|
/// Holds requested snapshot chunk hash if any.
|
|
|
|
|
asking_snapshot_data: Option<H256>,
|
|
|
|
|
/// Request timestamp
|
|
|
|
|
ask_time: f64,
|
|
|
|
|
ask_time: u64,
|
|
|
|
|
/// Holds a set of transactions recently sent to this peer to avoid spamming.
|
|
|
|
|
last_sent_transactions: HashSet<H256>,
|
|
|
|
|
/// Pending request is expired and result should be ignored
|
|
|
|
|
@@ -324,10 +331,13 @@ pub struct ChainSync {
|
|
|
|
|
network_id: U256,
|
|
|
|
|
/// Optional fork block to check
|
|
|
|
|
fork_block: Option<(BlockNumber, H256)>,
|
|
|
|
|
/// Snapshot sync allowed.
|
|
|
|
|
snapshot_sync_enabled: bool,
|
|
|
|
|
/// Snapshot downloader.
|
|
|
|
|
snapshot: Snapshot,
|
|
|
|
|
/// Connected peers pending Status message.
|
|
|
|
|
/// Value is request timestamp.
|
|
|
|
|
handshaking_peers: HashMap<PeerId, u64>,
|
|
|
|
|
/// Sync start timestamp. Measured when first peer is connected
|
|
|
|
|
sync_start_time: Option<u64>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
|
|
|
|
@@ -337,20 +347,21 @@ impl ChainSync {
|
|
|
|
|
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
|
|
|
|
|
let chain_info = chain.chain_info();
|
|
|
|
|
let mut sync = ChainSync {
|
|
|
|
|
state: SyncState::Idle,
|
|
|
|
|
state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle },
|
|
|
|
|
starting_block: chain.chain_info().best_block_number,
|
|
|
|
|
highest_block: None,
|
|
|
|
|
peers: HashMap::new(),
|
|
|
|
|
handshaking_peers: HashMap::new(),
|
|
|
|
|
active_peers: HashSet::new(),
|
|
|
|
|
new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number),
|
|
|
|
|
old_blocks: None,
|
|
|
|
|
last_sent_block_number: 0,
|
|
|
|
|
network_id: config.network_id,
|
|
|
|
|
fork_block: config.fork_block,
|
|
|
|
|
snapshot_sync_enabled: config.warp_sync,
|
|
|
|
|
snapshot: Snapshot::new(),
|
|
|
|
|
sync_start_time: None,
|
|
|
|
|
};
|
|
|
|
|
sync.init_downloaders(chain);
|
|
|
|
|
sync.update_targets(chain);
|
|
|
|
|
sync
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -381,7 +392,7 @@ impl ChainSync {
|
|
|
|
|
/// Returns information on peers connections
|
|
|
|
|
pub fn peers(&self, io: &SyncIo) -> Vec<PeerInfoDigest> {
|
|
|
|
|
self.peers.iter()
|
|
|
|
|
.filter_map(|(&peer_id, ref peer_data)|
|
|
|
|
|
.filter_map(|(&peer_id, peer_data)|
|
|
|
|
|
io.peer_session_info(peer_id).map(|session_info|
|
|
|
|
|
PeerInfoDigest {
|
|
|
|
|
id: session_info.id.map(|id| id.hex()),
|
|
|
|
|
@@ -442,20 +453,67 @@ impl ChainSync {
|
|
|
|
|
self.active_peers.remove(&peer_id);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
|
|
|
|
fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) {
|
|
|
|
|
if self.state != SyncState::WaitingPeers {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
let best_block = io.chain().chain_info().best_block_number;
|
|
|
|
|
|
|
|
|
|
let (best_hash, max_peers, snapshot_peers) = {
|
|
|
|
|
//collect snapshot infos from peers
|
|
|
|
|
let snapshots = self.peers.iter()
|
|
|
|
|
.filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn| best_block < sn && (sn - best_block) > SNAPSHOT_RESTORE_THRESHOLD))
|
|
|
|
|
.filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone())));
|
|
|
|
|
|
|
|
|
|
let mut snapshot_peers = HashMap::new();
|
|
|
|
|
let mut max_peers: usize = 0;
|
|
|
|
|
let mut best_hash = None;
|
|
|
|
|
for (p, hash) in snapshots {
|
|
|
|
|
let peers = snapshot_peers.entry(hash).or_insert_with(Vec::new);
|
|
|
|
|
peers.push(*p);
|
|
|
|
|
if peers.len() > max_peers {
|
|
|
|
|
max_peers = peers.len();
|
|
|
|
|
best_hash = Some(hash);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
(best_hash, max_peers, snapshot_peers)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC);
|
|
|
|
|
|
|
|
|
|
if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) {
|
|
|
|
|
if max_peers >= SNAPSHOT_MIN_PEERS {
|
|
|
|
|
trace!(target: "sync", "Starting confirmed snapshot sync {:?} with {:?}", hash, peers);
|
|
|
|
|
self.start_snapshot_sync(io, peers);
|
|
|
|
|
} else if timeout {
|
|
|
|
|
trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers);
|
|
|
|
|
self.start_snapshot_sync(io, peers);
|
|
|
|
|
}
|
|
|
|
|
} else if timeout {
|
|
|
|
|
trace!(target: "sync", "No snapshots found, starting full sync");
|
|
|
|
|
self.state = SyncState::Idle;
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) {
|
|
|
|
|
self.snapshot.clear();
|
|
|
|
|
self.request_snapshot_manifest(io, peer_id);
|
|
|
|
|
for p in peers {
|
|
|
|
|
if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) {
|
|
|
|
|
self.request_snapshot_manifest(io, *p);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
self.state = SyncState::SnapshotManifest;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks
|
|
|
|
|
pub fn restart(&mut self, io: &mut SyncIo) {
|
|
|
|
|
self.init_downloaders(io.chain());
|
|
|
|
|
self.update_targets(io.chain());
|
|
|
|
|
self.reset_and_continue(io);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
|
|
|
|
|
fn init_downloaders(&mut self, chain: &BlockChainClient) {
|
|
|
|
|
/// Update sync after the blockchain has been changed externally.
|
|
|
|
|
pub fn update_targets(&mut self, chain: &BlockChainClient) {
|
|
|
|
|
// Do not assume that the block queue/chain still has our last_imported_block
|
|
|
|
|
let chain = chain.chain_info();
|
|
|
|
|
self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number);
|
|
|
|
|
@@ -475,6 +533,7 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
/// Called by peer to report status
|
|
|
|
|
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
|
|
|
|
self.handshaking_peers.remove(&peer_id);
|
|
|
|
|
let protocol_version: u8 = try!(r.val_at(0));
|
|
|
|
|
let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0;
|
|
|
|
|
let peer = PeerInfo {
|
|
|
|
|
@@ -486,7 +545,7 @@ impl ChainSync {
|
|
|
|
|
asking: PeerAsking::Nothing,
|
|
|
|
|
asking_blocks: Vec::new(),
|
|
|
|
|
asking_hash: None,
|
|
|
|
|
ask_time: 0f64,
|
|
|
|
|
ask_time: 0,
|
|
|
|
|
last_sent_transactions: HashSet::new(),
|
|
|
|
|
expired: false,
|
|
|
|
|
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
|
|
|
|
@@ -496,7 +555,12 @@ impl ChainSync {
|
|
|
|
|
block_set: None,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
|
|
|
|
if self.sync_start_time.is_none() {
|
|
|
|
|
self.sync_start_time = Some(time::precise_time_ns());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})",
|
|
|
|
|
peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number);
|
|
|
|
|
if io.is_expired() {
|
|
|
|
|
trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
|
|
|
|
|
return Ok(());
|
|
|
|
|
@@ -578,7 +642,7 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
let item_count = r.item_count();
|
|
|
|
|
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, self.state, block_set);
|
|
|
|
|
if self.state == SyncState::Idle && self.old_blocks.is_none() {
|
|
|
|
|
if (self.state == SyncState::Idle || self.state == SyncState::WaitingPeers) && self.old_blocks.is_none() {
|
|
|
|
|
trace!(target: "sync", "Ignored unexpected block headers");
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
return Ok(());
|
|
|
|
|
@@ -875,7 +939,7 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
self.clear_peer_download(peer_id);
|
|
|
|
|
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
|
|
|
|
|
trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id);
|
|
|
|
|
trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id);
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
@@ -918,7 +982,7 @@ impl ChainSync {
|
|
|
|
|
match io.snapshot_service().status() {
|
|
|
|
|
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
|
|
|
|
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
|
|
|
|
self.state = SyncState::Idle;
|
|
|
|
|
self.state = SyncState::WaitingPeers;
|
|
|
|
|
self.snapshot.clear();
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
return Ok(());
|
|
|
|
|
@@ -960,6 +1024,7 @@ impl ChainSync {
|
|
|
|
|
/// Called by peer when it is disconnecting
|
|
|
|
|
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
|
|
|
|
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
|
|
|
|
|
self.handshaking_peers.remove(&peer);
|
|
|
|
|
if self.peers.contains_key(&peer) {
|
|
|
|
|
debug!(target: "sync", "Disconnected {}", peer);
|
|
|
|
|
self.clear_peer_download(peer);
|
|
|
|
|
@@ -975,15 +1040,13 @@ impl ChainSync {
|
|
|
|
|
if let Err(e) = self.send_status(io, peer) {
|
|
|
|
|
debug!(target:"sync", "Error sending status request: {:?}", e);
|
|
|
|
|
io.disable_peer(peer);
|
|
|
|
|
} else {
|
|
|
|
|
self.handshaking_peers.insert(peer, time::precise_time_ns());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Resume downloading
|
|
|
|
|
fn continue_sync(&mut self, io: &mut SyncIo) {
|
|
|
|
|
if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting
|
|
|
|
|
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {
|
|
|
|
|
self.complete_sync(io);
|
|
|
|
|
}
|
|
|
|
|
let mut peers: Vec<(PeerId, U256, u8)> = self.peers.iter().filter_map(|(k, p)|
|
|
|
|
|
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect();
|
|
|
|
|
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
|
|
|
|
@@ -995,6 +1058,11 @@ impl ChainSync {
|
|
|
|
|
self.sync_peer(io, p, false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (self.state != SyncState::WaitingPeers && self.state != SyncState::SnapshotWaiting && self.state != SyncState::Waiting && self.state != SyncState::Idle)
|
|
|
|
|
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) {
|
|
|
|
|
|
|
|
|
|
self.complete_sync(io);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Called after all blocks have been downloaded
|
|
|
|
|
@@ -1017,7 +1085,7 @@ impl ChainSync {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = {
|
|
|
|
|
if let Some(ref peer) = self.peers.get_mut(&peer_id) {
|
|
|
|
|
if let Some(peer) = self.peers.get_mut(&peer_id) {
|
|
|
|
|
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
@@ -1040,11 +1108,9 @@ impl ChainSync {
|
|
|
|
|
let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty);
|
|
|
|
|
if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() {
|
|
|
|
|
match self.state {
|
|
|
|
|
SyncState::Idle if self.snapshot_sync_enabled
|
|
|
|
|
&& chain_info.best_block_number < peer_snapshot_number
|
|
|
|
|
&& (peer_snapshot_number - chain_info.best_block_number) > SNAPSHOT_RESTORE_THRESHOLD => {
|
|
|
|
|
trace!(target: "sync", "Starting snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
|
|
|
|
|
self.start_snapshot_sync(io, peer_id);
|
|
|
|
|
SyncState::WaitingPeers => {
|
|
|
|
|
trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number);
|
|
|
|
|
self.maybe_start_snapshot_sync(io);
|
|
|
|
|
},
|
|
|
|
|
SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => {
|
|
|
|
|
if io.chain().queue_info().is_full() {
|
|
|
|
|
@@ -1070,6 +1136,13 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
SyncState::SnapshotData => {
|
|
|
|
|
if let RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } = io.snapshot_service().status() {
|
|
|
|
|
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
|
|
|
|
|
trace!(target: "sync", "Snapshot queue full, pausing sync");
|
|
|
|
|
self.state = SyncState::SnapshotWaiting;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
|
|
|
|
|
self.request_snapshot_data(io, peer_id);
|
|
|
|
|
}
|
|
|
|
|
@@ -1142,6 +1215,7 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import.
|
|
|
|
|
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
|
|
|
|
|
fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) {
|
|
|
|
|
match block_set {
|
|
|
|
|
BlockSet::NewBlocks => {
|
|
|
|
|
@@ -1242,7 +1316,7 @@ impl ChainSync {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Generic request sender
|
|
|
|
|
@@ -1252,7 +1326,7 @@ impl ChainSync {
|
|
|
|
|
warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking);
|
|
|
|
|
}
|
|
|
|
|
peer.asking = asking;
|
|
|
|
|
peer.ask_time = time::precise_time_s();
|
|
|
|
|
peer.ask_time = time::precise_time_ns();
|
|
|
|
|
let result = if packet_id >= ETH_PACKET_COUNT {
|
|
|
|
|
sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet)
|
|
|
|
|
} else {
|
|
|
|
|
@@ -1276,7 +1350,7 @@ impl ChainSync {
|
|
|
|
|
/// Called when peer sends us new transactions
|
|
|
|
|
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
|
|
|
|
// Accept transactions only when fully synced
|
|
|
|
|
if !io.is_chain_queue_empty() || self.state != SyncState::Idle || self.state != SyncState::NewBlocks {
|
|
|
|
|
if !io.is_chain_queue_empty() || (self.state != SyncState::Idle && self.state != SyncState::NewBlocks) {
|
|
|
|
|
trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id);
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
@@ -1370,7 +1444,7 @@ impl ChainSync {
|
|
|
|
|
while number <= last && count < max_count {
|
|
|
|
|
if let Some(hdr) = overlay.get(&number) {
|
|
|
|
|
trace!(target: "sync", "{}: Returning cached fork header", peer_id);
|
|
|
|
|
data.extend(hdr);
|
|
|
|
|
data.extend_from_slice(hdr);
|
|
|
|
|
count += 1;
|
|
|
|
|
} else if let Some(mut hdr) = io.chain().block_header(BlockID::Number(number)) {
|
|
|
|
|
data.append(&mut hdr);
|
|
|
|
|
@@ -1427,16 +1501,18 @@ impl ChainSync {
|
|
|
|
|
}
|
|
|
|
|
count = min(count, MAX_NODE_DATA_TO_SEND);
|
|
|
|
|
let mut added = 0usize;
|
|
|
|
|
let mut data = Bytes::new();
|
|
|
|
|
let mut data = Vec::new();
|
|
|
|
|
for i in 0..count {
|
|
|
|
|
if let Some(mut hdr) = io.chain().state_data(&try!(r.val_at::<H256>(i))) {
|
|
|
|
|
data.append(&mut hdr);
|
|
|
|
|
if let Some(hdr) = io.chain().state_data(&try!(r.val_at::<H256>(i))) {
|
|
|
|
|
data.push(hdr);
|
|
|
|
|
added += 1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
trace!(target: "sync", "{} -> GetNodeData: return {} entries", peer_id, added);
|
|
|
|
|
let mut rlp = RlpStream::new_list(added);
|
|
|
|
|
rlp.append_raw(&data, added);
|
|
|
|
|
for d in data {
|
|
|
|
|
rlp.append(&d);
|
|
|
|
|
}
|
|
|
|
|
Ok(Some((NODE_DATA_PACKET, rlp)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1587,17 +1663,18 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
#[cfg_attr(feature="dev", allow(match_same_arms))]
|
|
|
|
|
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
|
|
|
|
|
let tick = time::precise_time_s();
|
|
|
|
|
let tick = time::precise_time_ns();
|
|
|
|
|
let mut aborting = Vec::new();
|
|
|
|
|
for (peer_id, peer) in &self.peers {
|
|
|
|
|
let elapsed = (tick - peer.ask_time) / 1_000_000_000;
|
|
|
|
|
let timeout = match peer.asking {
|
|
|
|
|
PeerAsking::BlockHeaders => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::BlockReceipts => (tick - peer.ask_time) > RECEIPTS_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::Nothing => false,
|
|
|
|
|
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
|
|
|
|
|
PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC,
|
|
|
|
|
};
|
|
|
|
|
if timeout {
|
|
|
|
|
trace!(target:"sync", "Timeout {}", peer_id);
|
|
|
|
|
@@ -1608,16 +1685,42 @@ impl ChainSync {
|
|
|
|
|
for p in aborting {
|
|
|
|
|
self.on_peer_aborting(io, p);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check for handshake timeouts
|
|
|
|
|
for (peer, ask_time) in &self.handshaking_peers {
|
|
|
|
|
let elapsed = (tick - ask_time) / 1_000_000_000;
|
|
|
|
|
if elapsed > STATUS_TIMEOUT_SEC {
|
|
|
|
|
trace!(target:"sync", "Status timeout {}", peer);
|
|
|
|
|
io.disconnect_peer(*peer);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn check_resume(&mut self, io: &mut SyncIo) {
|
|
|
|
|
if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
|
|
|
|
|
self.state = SyncState::Blocks;
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
} else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive {
|
|
|
|
|
trace!(target:"sync", "Snapshot restoration is complete");
|
|
|
|
|
self.restart(io);
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
} else if self.state == SyncState::SnapshotWaiting {
|
|
|
|
|
match io.snapshot_service().status() {
|
|
|
|
|
RestorationStatus::Inactive => {
|
|
|
|
|
trace!(target:"sync", "Snapshot restoration is complete");
|
|
|
|
|
self.restart(io);
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
},
|
|
|
|
|
RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } => {
|
|
|
|
|
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
|
|
|
|
|
trace!(target:"sync", "Resuming snapshot sync");
|
|
|
|
|
self.state = SyncState::SnapshotData;
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
RestorationStatus::Failed => {
|
|
|
|
|
trace!(target: "sync", "Snapshot restoration aborted");
|
|
|
|
|
self.state = SyncState::WaitingPeers;
|
|
|
|
|
self.snapshot.clear();
|
|
|
|
|
self.continue_sync(io);
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1707,7 +1810,7 @@ impl ChainSync {
|
|
|
|
|
self.send_packet(io, *peer_id, NEW_BLOCK_PACKET, rlp);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
|
|
|
|
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
|
|
|
|
|
peer.latest_hash = chain_info.best_block_hash.clone();
|
|
|
|
|
}
|
|
|
|
|
sent += 1;
|
|
|
|
|
@@ -1725,7 +1828,7 @@ impl ChainSync {
|
|
|
|
|
sent += match ChainSync::create_new_hashes_rlp(io.chain(), &last_parent, &chain_info.best_block_hash) {
|
|
|
|
|
Some(rlp) => {
|
|
|
|
|
{
|
|
|
|
|
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
|
|
|
|
|
if let Some(ref mut peer) = self.peers.get_mut(peer_id) {
|
|
|
|
|
peer.latest_hash = chain_info.best_block_hash.clone();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1793,7 +1896,7 @@ impl ChainSync {
|
|
|
|
|
// Send RLPs
|
|
|
|
|
let sent = lucky_peers.len();
|
|
|
|
|
if sent > 0 {
|
|
|
|
|
for (peer_id, rlp) in lucky_peers.into_iter() {
|
|
|
|
|
for (peer_id, rlp) in lucky_peers {
|
|
|
|
|
self.send_packet(io, peer_id, TRANSACTIONS_PACKET, rlp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1825,6 +1928,7 @@ impl ChainSync {
|
|
|
|
|
|
|
|
|
|
/// Maintain other peers. Send out any new blocks and transactions
|
|
|
|
|
pub fn maintain_sync(&mut self, io: &mut SyncIo) {
|
|
|
|
|
self.maybe_start_snapshot_sync(io);
|
|
|
|
|
self.check_resume(io);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2025,7 +2129,9 @@ mod tests {
|
|
|
|
|
assert!(rlp_result.is_some());
|
|
|
|
|
|
|
|
|
|
// the length of one rlp-encoded hashe
|
|
|
|
|
assert_eq!(34, rlp_result.unwrap().1.out().len());
|
|
|
|
|
let rlp = rlp_result.unwrap().1.out();
|
|
|
|
|
let rlp = Rlp::new(&rlp);
|
|
|
|
|
assert_eq!(1, rlp.item_count());
|
|
|
|
|
|
|
|
|
|
io.sender = Some(2usize);
|
|
|
|
|
|
|
|
|
|
@@ -2045,7 +2151,7 @@ mod tests {
|
|
|
|
|
asking: PeerAsking::Nothing,
|
|
|
|
|
asking_blocks: Vec::new(),
|
|
|
|
|
asking_hash: None,
|
|
|
|
|
ask_time: 0f64,
|
|
|
|
|
ask_time: 0,
|
|
|
|
|
last_sent_transactions: HashSet::new(),
|
|
|
|
|
expired: false,
|
|
|
|
|
confirmation: super::ForkConfirmation::Confirmed,
|
|
|
|
|
|