Snapshot sync (#2047)
* PV64 sync * Tests * Client DB restore * Snapshot restoration over IPC * Upating test * Minor tweaks * Upating test
This commit is contained in:
@@ -96,17 +96,19 @@ use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError};
|
||||
use ethcore::error::*;
|
||||
use ethcore::block::Block;
|
||||
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
||||
use sync_io::SyncIo;
|
||||
use time;
|
||||
use super::SyncConfig;
|
||||
use blocks::BlockCollection;
|
||||
use snapshot::{Snapshot, ChunkType};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
known_heap_size!(0, PeerInfo);
|
||||
|
||||
type PacketDecodeError = DecoderError;
|
||||
|
||||
const PROTOCOL_VERSION: u8 = 63u8;
|
||||
const PROTOCOL_VERSION: u8 = 64u8;
|
||||
const MAX_BODIES_TO_SEND: usize = 256;
|
||||
const MAX_HEADERS_TO_SEND: usize = 512;
|
||||
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||
@@ -136,14 +138,26 @@ const GET_NODE_DATA_PACKET: u8 = 0x0d;
|
||||
const NODE_DATA_PACKET: u8 = 0x0e;
|
||||
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
||||
const RECEIPTS_PACKET: u8 = 0x10;
|
||||
const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
||||
const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
||||
const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
||||
const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
||||
|
||||
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
|
||||
const BODIES_TIMEOUT_SEC: f64 = 5f64;
|
||||
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
|
||||
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
|
||||
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 10f64;
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
/// Sync state
|
||||
pub enum SyncState {
|
||||
/// Waiting for pv64 peers to start snapshot syncing
|
||||
SnapshotManifest,
|
||||
/// Downloading snapshot data
|
||||
SnapshotData,
|
||||
/// Waiting for snapshot restoration to complete
|
||||
SnapshotWaiting,
|
||||
/// Downloading subchain heads
|
||||
ChainHead,
|
||||
/// Initial chain sync complete. Waiting for new packets
|
||||
@@ -177,10 +191,14 @@ pub struct SyncStatus {
|
||||
pub blocks_received: BlockNumber,
|
||||
/// Total number of connected peers
|
||||
pub num_peers: usize,
|
||||
/// Total number of active peers
|
||||
/// Total number of active peers.
|
||||
pub num_active_peers: usize,
|
||||
/// Heap memory used in bytes
|
||||
/// Heap memory used in bytes.
|
||||
pub mem_used: usize,
|
||||
/// Snapshot chunks
|
||||
pub num_snapshot_chunks: usize,
|
||||
/// Snapshot chunks downloaded
|
||||
pub snapshot_chunks_done: usize,
|
||||
}
|
||||
|
||||
impl SyncStatus {
|
||||
@@ -207,6 +225,8 @@ enum PeerAsking {
|
||||
BlockHeaders,
|
||||
BlockBodies,
|
||||
Heads,
|
||||
SnapshotManifest,
|
||||
SnapshotData,
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
@@ -240,6 +260,8 @@ struct PeerInfo {
|
||||
asking_blocks: Vec<H256>,
|
||||
/// Holds requested header hash if currently requesting block header by hash
|
||||
asking_hash: Option<H256>,
|
||||
/// Holds requested snapshot chunk hash if any.
|
||||
asking_snapshot_data: Option<H256>,
|
||||
/// Request timestamp
|
||||
ask_time: f64,
|
||||
/// Holds a set of transactions recently sent to this peer to avoid spamming.
|
||||
@@ -248,6 +270,10 @@ struct PeerInfo {
|
||||
expired: bool,
|
||||
/// Peer fork confirmation status
|
||||
confirmation: ForkConfirmation,
|
||||
/// Best snapshot hash
|
||||
snapshot_hash: Option<H256>,
|
||||
/// Best snapshot block number
|
||||
snapshot_number: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
@@ -293,6 +319,8 @@ pub struct ChainSync {
|
||||
network_id: U256,
|
||||
/// Optional fork block to check
|
||||
fork_block: Option<(BlockNumber, H256)>,
|
||||
/// Snapshot downloader.
|
||||
snapshot: Snapshot,
|
||||
}
|
||||
|
||||
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
||||
@@ -301,8 +329,8 @@ impl ChainSync {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
|
||||
let chain = chain.chain_info();
|
||||
let mut sync = ChainSync {
|
||||
state: SyncState::ChainHead,
|
||||
ChainSync {
|
||||
state: SyncState::Idle,
|
||||
starting_block: chain.best_block_number,
|
||||
highest_block: None,
|
||||
last_imported_block: chain.best_block_number,
|
||||
@@ -317,16 +345,15 @@ impl ChainSync {
|
||||
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
||||
network_id: config.network_id,
|
||||
fork_block: config.fork_block,
|
||||
};
|
||||
sync.reset();
|
||||
sync
|
||||
snapshot: Snapshot::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// @returns Synchonization status
|
||||
pub fn status(&self) -> SyncStatus {
|
||||
SyncStatus {
|
||||
state: self.state.clone(),
|
||||
protocol_version: 63,
|
||||
protocol_version: if self.state == SyncState::SnapshotData { 64 } else { 63 },
|
||||
network_id: self.network_id,
|
||||
start_block_number: self.starting_block,
|
||||
last_imported_block_number: Some(self.last_imported_block),
|
||||
@@ -335,6 +362,8 @@ impl ChainSync {
|
||||
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
||||
num_peers: self.peers.values().filter(|p| p.is_allowed()).count(),
|
||||
num_active_peers: self.peers.values().filter(|p| p.is_allowed() && p.asking != PeerAsking::Nothing).count(),
|
||||
num_snapshot_chunks: self.snapshot.total_chunks(),
|
||||
snapshot_chunks_done: self.snapshot.done_chunks(),
|
||||
mem_used:
|
||||
self.blocks.heap_size()
|
||||
+ self.peers.heap_size_of_children()
|
||||
@@ -350,8 +379,13 @@ impl ChainSync {
|
||||
|
||||
#[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()`
|
||||
/// Reset sync. Clear all downloaded data but keep the queue
|
||||
fn reset(&mut self) {
|
||||
fn reset(&mut self, io: &mut SyncIo) {
|
||||
self.blocks.clear();
|
||||
self.snapshot.clear();
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
io.snapshot_service().abort_restore();
|
||||
}
|
||||
for (_, ref mut p) in &mut self.peers {
|
||||
p.asking_blocks.clear();
|
||||
p.asking_hash = None;
|
||||
@@ -368,7 +402,7 @@ impl ChainSync {
|
||||
/// Restart sync
|
||||
pub fn restart(&mut self, io: &mut SyncIo) {
|
||||
trace!(target: "sync", "Restarting");
|
||||
self.reset();
|
||||
self.reset(io);
|
||||
self.start_sync_round(io);
|
||||
self.continue_sync(io);
|
||||
}
|
||||
@@ -380,13 +414,19 @@ impl ChainSync {
|
||||
if self.active_peers.is_empty() {
|
||||
trace!(target: "sync", "No more active peers");
|
||||
if self.state == SyncState::ChainHead {
|
||||
self.complete_sync();
|
||||
self.complete_sync(io);
|
||||
} else {
|
||||
self.restart(io);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
self.snapshot.clear();
|
||||
self.request_snapshot_manifest(io, peer_id);
|
||||
self.state = SyncState::SnapshotManifest;
|
||||
}
|
||||
|
||||
/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
|
||||
fn restart_on_bad_block(&mut self, io: &mut SyncIo) {
|
||||
// Do not assume that the block queue/chain still has our last_imported_block
|
||||
@@ -398,8 +438,9 @@ impl ChainSync {
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
let protocol_version: u32 = try!(r.val_at(0));
|
||||
let peer = PeerInfo {
|
||||
protocol_version: try!(r.val_at(0)),
|
||||
protocol_version: protocol_version,
|
||||
network_id: try!(r.val_at(1)),
|
||||
difficulty: Some(try!(r.val_at(2))),
|
||||
latest_hash: try!(r.val_at(3)),
|
||||
@@ -412,6 +453,9 @@ impl ChainSync {
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
asking_snapshot_data: None,
|
||||
snapshot_hash: if protocol_version == 64 { Some(try!(r.val_at(5))) } else { None },
|
||||
snapshot_number: if protocol_version == 64 { Some(try!(r.val_at(6))) } else { None },
|
||||
};
|
||||
|
||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||
@@ -749,6 +793,96 @@ impl ChainSync {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot manifest is downloaded from a peer.
|
||||
fn on_snapshot_manifest(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
self.clear_peer_download(peer_id);
|
||||
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
|
||||
trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let manifest_rlp = try!(r.at(0));
|
||||
let manifest = match ManifestData::from_rlp(&manifest_rlp.as_raw()) {
|
||||
Err(e) => {
|
||||
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
|
||||
io.disconnect_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(manifest) => manifest,
|
||||
};
|
||||
self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3());
|
||||
io.snapshot_service().begin_restore(manifest);
|
||||
self.state = SyncState::SnapshotData;
|
||||
|
||||
// give a task to the same peer first.
|
||||
self.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot data is downloaded from a peer.
|
||||
fn on_snapshot_data(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
self.clear_peer_download(peer_id);
|
||||
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || self.state != SyncState::SnapshotData {
|
||||
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check service status
|
||||
match io.snapshot_service().status() {
|
||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
||||
self.state = SyncState::Idle;
|
||||
self.snapshot.clear();
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
RestorationStatus::Ongoing { .. } => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
|
||||
},
|
||||
}
|
||||
|
||||
let snapshot_data: Bytes = try!(r.val_at(0));
|
||||
match self.snapshot.validate_chunk(&snapshot_data) {
|
||||
Ok(ChunkType::Block(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing block chunk", peer_id);
|
||||
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
|
||||
}
|
||||
Ok(ChunkType::State(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing state chunk", peer_id);
|
||||
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
|
||||
}
|
||||
Err(()) => {
|
||||
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
|
||||
io.disconnect_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if self.snapshot.is_complete() {
|
||||
// wait for snapshot restoration process to complete
|
||||
self.state = SyncState::SnapshotWaiting;
|
||||
}
|
||||
// give a task to the same peer first.
|
||||
self.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
|
||||
@@ -764,7 +898,7 @@ impl ChainSync {
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer));
|
||||
if let Err(e) = self.send_status(io) {
|
||||
if let Err(e) = self.send_status(io, peer) {
|
||||
debug!(target:"sync", "Error sending status request: {:?}", e);
|
||||
io.disable_peer(peer);
|
||||
}
|
||||
@@ -772,24 +906,27 @@ impl ChainSync {
|
||||
|
||||
/// Resume downloading
|
||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
|
||||
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
||||
let mut peers: Vec<(PeerId, U256, u32)> = self.peers.iter().filter_map(|(k, p)|
|
||||
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect();
|
||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||
// prefer peers with higher protocol version
|
||||
peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2));
|
||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||
for (p, _) in peers {
|
||||
for (p, _, _) in peers {
|
||||
if self.active_peers.contains(&p) {
|
||||
self.sync_peer(io, p, false);
|
||||
}
|
||||
}
|
||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||
self.complete_sync();
|
||||
if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting
|
||||
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||
self.complete_sync(io);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called after all blocks have been downloaded
|
||||
fn complete_sync(&mut self) {
|
||||
fn complete_sync(&mut self, io: &mut SyncIo) {
|
||||
trace!(target: "sync", "Sync complete");
|
||||
self.reset();
|
||||
self.reset(io);
|
||||
self.state = SyncState::Idle;
|
||||
}
|
||||
|
||||
@@ -805,7 +942,7 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Skipping deactivated peer");
|
||||
return;
|
||||
}
|
||||
let (peer_latest, peer_difficulty) = {
|
||||
let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
||||
return;
|
||||
@@ -814,7 +951,11 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Waiting for the block queue");
|
||||
return;
|
||||
}
|
||||
(peer.latest_hash.clone(), peer.difficulty.clone())
|
||||
if self.state == SyncState::SnapshotWaiting {
|
||||
trace!(target: "sync", "Waiting for the snapshot restoration");
|
||||
return;
|
||||
}
|
||||
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned(), peer.snapshot_hash.as_ref().cloned())
|
||||
};
|
||||
let chain_info = io.chain().chain_info();
|
||||
let td = chain_info.pending_total_difficulty;
|
||||
@@ -823,13 +964,18 @@ impl ChainSync {
|
||||
if force || self.state == SyncState::NewBlocks || peer_difficulty.map_or(true, |pd| pd > syncing_difficulty) {
|
||||
match self.state {
|
||||
SyncState::Idle => {
|
||||
if self.last_imported_block < chain_info.best_block_number {
|
||||
self.last_imported_block = chain_info.best_block_number;
|
||||
self.last_imported_hash = chain_info.best_block_hash;
|
||||
// check if we can start snapshot sync with this peer
|
||||
if peer_snapshot_number.unwrap_or(0) > 0 && chain_info.best_block_number == 0 {
|
||||
self.start_snapshot_sync(io, peer_id);
|
||||
} else {
|
||||
if self.last_imported_block < chain_info.best_block_number {
|
||||
self.last_imported_block = chain_info.best_block_number;
|
||||
self.last_imported_hash = chain_info.best_block_hash;
|
||||
}
|
||||
trace!(target: "sync", "Starting sync with {}", peer_id);
|
||||
self.start_sync_round(io);
|
||||
self.sync_peer(io, peer_id, force);
|
||||
}
|
||||
trace!(target: "sync", "Starting sync with {}", peer_id);
|
||||
self.start_sync_round(io);
|
||||
self.sync_peer(io, peer_id, force);
|
||||
},
|
||||
SyncState::ChainHead => {
|
||||
// Request subchain headers
|
||||
@@ -843,8 +989,14 @@ impl ChainSync {
|
||||
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
|
||||
self.request_blocks(io, peer_id, false);
|
||||
}
|
||||
}
|
||||
SyncState::Waiting => ()
|
||||
},
|
||||
SyncState::SnapshotData => {
|
||||
if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
|
||||
self.request_snapshot_data(io, peer_id);
|
||||
}
|
||||
},
|
||||
SyncState::SnapshotManifest => (), //already downloading from other peer
|
||||
SyncState::Waiting | SyncState::SnapshotWaiting => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -903,6 +1055,16 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
fn request_snapshot_data(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
self.clear_peer_download(peer_id);
|
||||
// find chunk data to download
|
||||
if let Some(hash) = self.snapshot.needed_chunk() {
|
||||
self.peers.get_mut(&peer_id).unwrap().asking_snapshot_data = Some(hash.clone());
|
||||
self.request_snapshot_chunk(io, peer_id, &hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear all blocks/headers marked as being downloaded by a peer.
|
||||
fn clear_peer_download(&mut self, peer_id: PeerId) {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
@@ -917,9 +1079,15 @@ impl ChainSync {
|
||||
self.blocks.clear_body_download(b);
|
||||
}
|
||||
},
|
||||
PeerAsking::SnapshotData => {
|
||||
if let Some(hash) = peer.asking_snapshot_data {
|
||||
self.snapshot.clear_chunk_download(&hash);
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
peer.asking_blocks.clear();
|
||||
peer.asking_snapshot_data = None;
|
||||
}
|
||||
|
||||
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
|
||||
@@ -1016,6 +1184,22 @@ impl ChainSync {
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request snapshot manifest from a peer.
|
||||
fn request_snapshot_manifest(&mut self, sync: &mut SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
self.send_request(sync, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request snapshot chunk from a peer.
|
||||
fn request_snapshot_chunk(&mut self, sync: &mut SyncIo, peer_id: PeerId, chunk: &H256) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(chunk);
|
||||
self.send_request(sync, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request block bodies from a peer
|
||||
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
@@ -1086,14 +1270,22 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Send Status message
|
||||
fn send_status(&mut self, io: &mut SyncIo) -> Result<(), NetworkError> {
|
||||
let mut packet = RlpStream::new_list(5);
|
||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> {
|
||||
let pv64 = io.eth_protocol_version(peer) >= 64;
|
||||
let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 });
|
||||
let chain = io.chain().chain_info();
|
||||
packet.append(&(PROTOCOL_VERSION as u32));
|
||||
packet.append(&self.network_id);
|
||||
packet.append(&chain.total_difficulty);
|
||||
packet.append(&chain.best_block_hash);
|
||||
packet.append(&chain.genesis_hash);
|
||||
if pv64 {
|
||||
let manifest = io.snapshot_service().manifest();
|
||||
let block_number = manifest.as_ref().map_or(0, |m| m.block_number);
|
||||
let manifest_hash = manifest.map_or(H256::new(), |m| m.into_rlp().sha3());
|
||||
packet.append(&manifest_hash);
|
||||
packet.append(&block_number);
|
||||
}
|
||||
io.respond(STATUS_PACKET, packet.out())
|
||||
}
|
||||
|
||||
@@ -1230,6 +1422,48 @@ impl ChainSync {
|
||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_manifest(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let count = r.item_count();
|
||||
trace!(target: "sync", "{} -> GetSnapshotManifest", peer_id);
|
||||
if count != 0 {
|
||||
debug!(target: "sync", "Invalid GetSnapshotManifest request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
let rlp = match io.snapshot_service().manifest() {
|
||||
Some(manifest) => {
|
||||
trace!(target: "sync", "{} <- SnapshotManifest", peer_id);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&manifest.into_rlp(), 1);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
trace!(target: "sync", "{}: No manifest to return", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
rlp
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
let rlp = match io.snapshot_service().chunk(hash) {
|
||||
Some(data) => {
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(&data);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
let rlp = RlpStream::new_list(0);
|
||||
rlp
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_DATA_PACKET, rlp)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(NetworkError) -> String
|
||||
@@ -1266,6 +1500,14 @@ impl ChainSync {
|
||||
ChainSync::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_MANIFEST_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_snapshot_manifest,
|
||||
|e| format!("Error sending snapshot manifest: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_snapshot_data,
|
||||
|e| format!("Error sending snapshot data: {:?}", e)),
|
||||
|
||||
_ => {
|
||||
sync.write().on_packet(io, peer, packet_id, data);
|
||||
Ok(())
|
||||
@@ -1289,6 +1531,8 @@ impl ChainSync {
|
||||
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
|
||||
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
|
||||
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
|
||||
SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp),
|
||||
SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp),
|
||||
_ => {
|
||||
debug!(target: "sync", "Unknown packet {}", packet_id);
|
||||
Ok(())
|
||||
@@ -1308,6 +1552,8 @@ impl ChainSync {
|
||||
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
|
||||
PeerAsking::Nothing => false,
|
||||
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
|
||||
PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
|
||||
PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC,
|
||||
};
|
||||
if timeout {
|
||||
trace!(target:"sync", "Timeout {}", peer_id);
|
||||
@@ -1321,9 +1567,12 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
fn check_resume(&mut self, io: &mut SyncIo) {
|
||||
if !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
|
||||
if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
|
||||
self.state = SyncState::Blocks;
|
||||
self.continue_sync(io);
|
||||
} else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive {
|
||||
self.state = SyncState::Idle;
|
||||
self.continue_sync(io);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1559,6 +1808,7 @@ impl ChainSync {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tests::helpers::*;
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
use super::*;
|
||||
use ::SyncConfig;
|
||||
use util::*;
|
||||
@@ -1612,7 +1862,8 @@ mod tests {
|
||||
fn return_receipts_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0);
|
||||
|
||||
@@ -1624,7 +1875,8 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let mut receipt_list = RlpStream::new_list(4);
|
||||
receipt_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
@@ -1679,7 +1931,8 @@ mod tests {
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let unknown: H256 = H256::new();
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
|
||||
@@ -1717,7 +1970,8 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let mut node_list = RlpStream::new_list(3);
|
||||
node_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
@@ -1758,6 +2012,9 @@ mod tests {
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: super::ForkConfirmation::Confirmed,
|
||||
snapshot_number: None,
|
||||
snapshot_hash: None,
|
||||
asking_snapshot_data: None,
|
||||
});
|
||||
sync
|
||||
}
|
||||
@@ -1769,7 +2026,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let lagging_peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
|
||||
@@ -1800,7 +2058,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
@@ -1820,7 +2079,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
|
||||
@@ -1840,7 +2100,8 @@ mod tests {
|
||||
let hash = client.block_hash(BlockID::Number(99)).unwrap();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
|
||||
|
||||
@@ -1859,7 +2120,8 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
// Try to propagate same transactions for the second time
|
||||
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
||||
@@ -1880,7 +2142,8 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
|
||||
// Try to propagate same transactions for the second time
|
||||
@@ -1903,17 +2166,17 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut ss = TestSnapshotService::new();
|
||||
// should sent some
|
||||
{
|
||||
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
assert_eq!(1, io.queue.len());
|
||||
assert_eq!(1, peer_count);
|
||||
}
|
||||
// Insert some more
|
||||
client.insert_transaction_to_queue();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
// Propagate new transactions
|
||||
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
||||
// And now the peer should have all transactions
|
||||
@@ -1939,7 +2202,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
//sync.have_common_block = true;
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let block = UntrustedRlp::new(&block_data);
|
||||
|
||||
@@ -1957,7 +2221,8 @@ mod tests {
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let block = UntrustedRlp::new(&block_data);
|
||||
|
||||
@@ -1972,7 +2237,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let empty_data = vec![];
|
||||
let block = UntrustedRlp::new(&empty_data);
|
||||
@@ -1988,7 +2254,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let hashes_data = get_dummy_hashes();
|
||||
let hashes_rlp = UntrustedRlp::new(&hashes_data);
|
||||
@@ -2004,7 +2271,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let empty_hashes_data = vec![];
|
||||
let hashes_rlp = UntrustedRlp::new(&empty_hashes_data);
|
||||
@@ -2023,7 +2291,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
@@ -2042,7 +2311,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
@@ -2076,7 +2346,8 @@ mod tests {
|
||||
// when
|
||||
{
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
|
||||
@@ -2090,7 +2361,8 @@ mod tests {
|
||||
}
|
||||
{
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
|
||||
}
|
||||
@@ -2114,7 +2386,8 @@ mod tests {
|
||||
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
// when
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
|
||||
Reference in New Issue
Block a user