Merge remote-tracking branch 'parity/master' into bft
Conflicts: sync/src/api.rs sync/src/lib.rs
This commit is contained in:
@@ -20,6 +20,7 @@ use network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId,
|
||||
use util::{U256, H256};
|
||||
use io::{TimerToken};
|
||||
use ethcore::client::{BlockChainClient, ChainNotify};
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use ethcore::header::BlockNumber;
|
||||
use sync_io::NetSyncIo;
|
||||
use chain::{ChainSync, SyncStatus};
|
||||
@@ -76,14 +77,14 @@ pub struct EthSync {
|
||||
|
||||
impl EthSync {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn new(config: SyncConfig, chain: Arc<BlockChainClient>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, NetworkError> {
|
||||
pub fn new(config: SyncConfig, chain: Arc<BlockChainClient>, snapshot_service: Arc<SnapshotService>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, NetworkError> {
|
||||
let inf_sync = InfinitySync::new(&config, chain.clone());
|
||||
let chain_sync = ChainSync::new(config, &*chain);
|
||||
let service = try!(NetworkService::new(try!(network_config.into_basic())));
|
||||
let sync = Arc::new(EthSync{
|
||||
network: service,
|
||||
eth_handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain.clone() }),
|
||||
inf_handler: Arc::new(InfProtocolHandler { sync: RwLock::new(inf_sync), chain: chain }),
|
||||
eth_handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain.clone(), snapshot_service: snapshot_service.clone() }),
|
||||
inf_handler: Arc::new(InfProtocolHandler { sync: RwLock::new(inf_sync), chain: chain, snapshot_service: snapshot_service }),
|
||||
});
|
||||
|
||||
Ok(sync)
|
||||
@@ -102,6 +103,8 @@ impl SyncProvider for EthSync {
|
||||
struct SyncProtocolHandler {
|
||||
/// Shared blockchain client.
|
||||
chain: Arc<BlockChainClient>,
|
||||
/// Shared snapshot service.
|
||||
snapshot_service: Arc<SnapshotService>,
|
||||
/// Sync strategy
|
||||
sync: RwLock<ChainSync>,
|
||||
}
|
||||
@@ -112,27 +115,29 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain), *peer, packet_id, data);
|
||||
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer, packet_id, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain), *peer);
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain), *peer);
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain));
|
||||
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain));
|
||||
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain));
|
||||
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
|
||||
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
|
||||
self.sync.write().propagate_new_transactions(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service));
|
||||
}
|
||||
}
|
||||
|
||||
struct InfProtocolHandler {
|
||||
/// Shared blockchain client.
|
||||
chain: Arc<BlockChainClient>,
|
||||
/// Shared snapshot service.
|
||||
snapshot_service: Arc<SnapshotService>,
|
||||
/// Sync strategy
|
||||
sync: RwLock<InfinitySync>,
|
||||
}
|
||||
@@ -142,15 +147,15 @@ impl NetworkProtocolHandler for InfProtocolHandler {
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
InfinitySync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain), *peer, packet_id, data);
|
||||
InfinitySync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer, packet_id, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain), *peer);
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain), *peer);
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service), *peer);
|
||||
}
|
||||
|
||||
fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {
|
||||
@@ -167,7 +172,7 @@ impl ChainNotify for EthSync {
|
||||
_duration: u64)
|
||||
{
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain);
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service);
|
||||
self.eth_handler.sync.write().chain_new_blocks(
|
||||
&mut sync_io,
|
||||
&imported,
|
||||
@@ -180,7 +185,7 @@ impl ChainNotify for EthSync {
|
||||
|
||||
fn start(&self) {
|
||||
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
||||
self.network.register_protocol(self.eth_handler.clone(), ETH_PROTOCOL, &[62u8, 63u8])
|
||||
self.network.register_protocol(self.eth_handler.clone(), ETH_PROTOCOL, &[62u8, 63u8, 64u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||
self.network.register_protocol(self.inf_handler.clone(), INF_PROTOCOL, &[1u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering infinity protocol: {:?}", e));
|
||||
@@ -192,7 +197,7 @@ impl ChainNotify for EthSync {
|
||||
|
||||
fn broadcast(&self, message: Vec<u8>) {
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain);
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service);
|
||||
self.inf_handler.sync.write().propagate_packet(&mut sync_io, message.clone());
|
||||
});
|
||||
}
|
||||
@@ -245,7 +250,7 @@ impl ManageNetwork for EthSync {
|
||||
|
||||
fn stop_network(&self) {
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain);
|
||||
let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service);
|
||||
self.eth_handler.sync.write().abort(&mut sync_io);
|
||||
});
|
||||
self.stop();
|
||||
|
||||
@@ -96,17 +96,19 @@ use ethcore::header::{BlockNumber, Header as BlockHeader};
|
||||
use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo, BlockImportError};
|
||||
use ethcore::error::*;
|
||||
use ethcore::block::Block;
|
||||
use ethcore::snapshot::{ManifestData, RestorationStatus};
|
||||
use sync_io::SyncIo;
|
||||
use time;
|
||||
use super::SyncConfig;
|
||||
use blocks::BlockCollection;
|
||||
use snapshot::{Snapshot, ChunkType};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
known_heap_size!(0, PeerInfo);
|
||||
|
||||
type PacketDecodeError = DecoderError;
|
||||
|
||||
const PROTOCOL_VERSION: u8 = 63u8;
|
||||
const PROTOCOL_VERSION: u8 = 64u8;
|
||||
const MAX_BODIES_TO_SEND: usize = 256;
|
||||
const MAX_HEADERS_TO_SEND: usize = 512;
|
||||
const MAX_NODE_DATA_TO_SEND: usize = 1024;
|
||||
@@ -136,14 +138,26 @@ const GET_NODE_DATA_PACKET: u8 = 0x0d;
|
||||
const NODE_DATA_PACKET: u8 = 0x0e;
|
||||
const GET_RECEIPTS_PACKET: u8 = 0x0f;
|
||||
const RECEIPTS_PACKET: u8 = 0x10;
|
||||
const GET_SNAPSHOT_MANIFEST_PACKET: u8 = 0x11;
|
||||
const SNAPSHOT_MANIFEST_PACKET: u8 = 0x12;
|
||||
const GET_SNAPSHOT_DATA_PACKET: u8 = 0x13;
|
||||
const SNAPSHOT_DATA_PACKET: u8 = 0x14;
|
||||
|
||||
const HEADERS_TIMEOUT_SEC: f64 = 15f64;
|
||||
const BODIES_TIMEOUT_SEC: f64 = 5f64;
|
||||
const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64;
|
||||
const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64;
|
||||
const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 10f64;
|
||||
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
|
||||
/// Sync state
|
||||
pub enum SyncState {
|
||||
/// Waiting for pv64 peers to start snapshot syncing
|
||||
SnapshotManifest,
|
||||
/// Downloading snapshot data
|
||||
SnapshotData,
|
||||
/// Waiting for snapshot restoration to complete
|
||||
SnapshotWaiting,
|
||||
/// Downloading subchain heads
|
||||
ChainHead,
|
||||
/// Initial chain sync complete. Waiting for new packets
|
||||
@@ -177,10 +191,14 @@ pub struct SyncStatus {
|
||||
pub blocks_received: BlockNumber,
|
||||
/// Total number of connected peers
|
||||
pub num_peers: usize,
|
||||
/// Total number of active peers
|
||||
/// Total number of active peers.
|
||||
pub num_active_peers: usize,
|
||||
/// Heap memory used in bytes
|
||||
/// Heap memory used in bytes.
|
||||
pub mem_used: usize,
|
||||
/// Snapshot chunks
|
||||
pub num_snapshot_chunks: usize,
|
||||
/// Snapshot chunks downloaded
|
||||
pub snapshot_chunks_done: usize,
|
||||
}
|
||||
|
||||
impl SyncStatus {
|
||||
@@ -207,6 +225,8 @@ enum PeerAsking {
|
||||
BlockHeaders,
|
||||
BlockBodies,
|
||||
Heads,
|
||||
SnapshotManifest,
|
||||
SnapshotData,
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
@@ -240,6 +260,8 @@ struct PeerInfo {
|
||||
asking_blocks: Vec<H256>,
|
||||
/// Holds requested header hash if currently requesting block header by hash
|
||||
asking_hash: Option<H256>,
|
||||
/// Holds requested snapshot chunk hash if any.
|
||||
asking_snapshot_data: Option<H256>,
|
||||
/// Request timestamp
|
||||
ask_time: f64,
|
||||
/// Holds a set of transactions recently sent to this peer to avoid spamming.
|
||||
@@ -248,6 +270,10 @@ struct PeerInfo {
|
||||
expired: bool,
|
||||
/// Peer fork confirmation status
|
||||
confirmation: ForkConfirmation,
|
||||
/// Best snapshot hash
|
||||
snapshot_hash: Option<H256>,
|
||||
/// Best snapshot block number
|
||||
snapshot_number: Option<BlockNumber>,
|
||||
}
|
||||
|
||||
impl PeerInfo {
|
||||
@@ -293,6 +319,8 @@ pub struct ChainSync {
|
||||
network_id: U256,
|
||||
/// Optional fork block to check
|
||||
fork_block: Option<(BlockNumber, H256)>,
|
||||
/// Snapshot downloader.
|
||||
snapshot: Snapshot,
|
||||
}
|
||||
|
||||
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
|
||||
@@ -301,8 +329,8 @@ impl ChainSync {
|
||||
/// Create a new instance of syncing strategy.
|
||||
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
|
||||
let chain = chain.chain_info();
|
||||
let mut sync = ChainSync {
|
||||
state: SyncState::ChainHead,
|
||||
ChainSync {
|
||||
state: SyncState::Idle,
|
||||
starting_block: chain.best_block_number,
|
||||
highest_block: None,
|
||||
last_imported_block: chain.best_block_number,
|
||||
@@ -317,16 +345,15 @@ impl ChainSync {
|
||||
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
|
||||
network_id: config.network_id,
|
||||
fork_block: config.fork_block,
|
||||
};
|
||||
sync.reset();
|
||||
sync
|
||||
snapshot: Snapshot::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// @returns Synchonization status
|
||||
pub fn status(&self) -> SyncStatus {
|
||||
SyncStatus {
|
||||
state: self.state.clone(),
|
||||
protocol_version: 63,
|
||||
protocol_version: if self.state == SyncState::SnapshotData { 64 } else { 63 },
|
||||
network_id: self.network_id,
|
||||
start_block_number: self.starting_block,
|
||||
last_imported_block_number: Some(self.last_imported_block),
|
||||
@@ -335,6 +362,8 @@ impl ChainSync {
|
||||
blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 },
|
||||
num_peers: self.peers.values().filter(|p| p.is_allowed()).count(),
|
||||
num_active_peers: self.peers.values().filter(|p| p.is_allowed() && p.asking != PeerAsking::Nothing).count(),
|
||||
num_snapshot_chunks: self.snapshot.total_chunks(),
|
||||
snapshot_chunks_done: self.snapshot.done_chunks(),
|
||||
mem_used:
|
||||
self.blocks.heap_size()
|
||||
+ self.peers.heap_size_of_children()
|
||||
@@ -350,8 +379,13 @@ impl ChainSync {
|
||||
|
||||
#[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()`
|
||||
/// Reset sync. Clear all downloaded data but keep the queue
|
||||
fn reset(&mut self) {
|
||||
fn reset(&mut self, io: &mut SyncIo) {
|
||||
self.blocks.clear();
|
||||
self.snapshot.clear();
|
||||
if self.state == SyncState::SnapshotData {
|
||||
debug!(target:"sync", "Aborting snapshot restore");
|
||||
io.snapshot_service().abort_restore();
|
||||
}
|
||||
for (_, ref mut p) in &mut self.peers {
|
||||
p.asking_blocks.clear();
|
||||
p.asking_hash = None;
|
||||
@@ -368,7 +402,7 @@ impl ChainSync {
|
||||
/// Restart sync
|
||||
pub fn restart(&mut self, io: &mut SyncIo) {
|
||||
trace!(target: "sync", "Restarting");
|
||||
self.reset();
|
||||
self.reset(io);
|
||||
self.start_sync_round(io);
|
||||
self.continue_sync(io);
|
||||
}
|
||||
@@ -380,13 +414,19 @@ impl ChainSync {
|
||||
if self.active_peers.is_empty() {
|
||||
trace!(target: "sync", "No more active peers");
|
||||
if self.state == SyncState::ChainHead {
|
||||
self.complete_sync();
|
||||
self.complete_sync(io);
|
||||
} else {
|
||||
self.restart(io);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
self.snapshot.clear();
|
||||
self.request_snapshot_manifest(io, peer_id);
|
||||
self.state = SyncState::SnapshotManifest;
|
||||
}
|
||||
|
||||
/// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks
|
||||
fn restart_on_bad_block(&mut self, io: &mut SyncIo) {
|
||||
// Do not assume that the block queue/chain still has our last_imported_block
|
||||
@@ -398,8 +438,9 @@ impl ChainSync {
|
||||
|
||||
/// Called by peer to report status
|
||||
fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
let protocol_version: u32 = try!(r.val_at(0));
|
||||
let peer = PeerInfo {
|
||||
protocol_version: try!(r.val_at(0)),
|
||||
protocol_version: protocol_version,
|
||||
network_id: try!(r.val_at(1)),
|
||||
difficulty: Some(try!(r.val_at(2))),
|
||||
latest_hash: try!(r.val_at(3)),
|
||||
@@ -412,6 +453,9 @@ impl ChainSync {
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed },
|
||||
asking_snapshot_data: None,
|
||||
snapshot_hash: if protocol_version == 64 { Some(try!(r.val_at(5))) } else { None },
|
||||
snapshot_number: if protocol_version == 64 { Some(try!(r.val_at(6))) } else { None },
|
||||
};
|
||||
|
||||
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
|
||||
@@ -749,6 +793,96 @@ impl ChainSync {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot manifest is downloaded from a peer.
|
||||
fn on_snapshot_manifest(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot manifest from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
self.clear_peer_download(peer_id);
|
||||
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest {
|
||||
trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let manifest_rlp = try!(r.at(0));
|
||||
let manifest = match ManifestData::from_rlp(&manifest_rlp.as_raw()) {
|
||||
Err(e) => {
|
||||
trace!(target: "sync", "{}: Ignored bad manifest: {:?}", peer_id, e);
|
||||
io.disconnect_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
Ok(manifest) => manifest,
|
||||
};
|
||||
self.snapshot.reset_to(&manifest, &manifest_rlp.as_raw().sha3());
|
||||
io.snapshot_service().begin_restore(manifest);
|
||||
self.state = SyncState::SnapshotData;
|
||||
|
||||
// give a task to the same peer first.
|
||||
self.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when snapshot data is downloaded from a peer.
|
||||
fn on_snapshot_data(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
|
||||
if !self.peers.get(&peer_id).map_or(false, |p| p.can_sync()) {
|
||||
trace!(target: "sync", "Ignoring snapshot data from unconfirmed peer {}", peer_id);
|
||||
return Ok(());
|
||||
}
|
||||
self.clear_peer_download(peer_id);
|
||||
if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotData) || self.state != SyncState::SnapshotData {
|
||||
trace!(target: "sync", "{}: Ignored unexpected snapshot data", peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// check service status
|
||||
match io.snapshot_service().status() {
|
||||
RestorationStatus::Inactive | RestorationStatus::Failed => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id);
|
||||
self.state = SyncState::Idle;
|
||||
self.snapshot.clear();
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
},
|
||||
RestorationStatus::Ongoing { .. } => {
|
||||
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
|
||||
},
|
||||
}
|
||||
|
||||
let snapshot_data: Bytes = try!(r.val_at(0));
|
||||
match self.snapshot.validate_chunk(&snapshot_data) {
|
||||
Ok(ChunkType::Block(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing block chunk", peer_id);
|
||||
io.snapshot_service().restore_block_chunk(hash, snapshot_data);
|
||||
}
|
||||
Ok(ChunkType::State(hash)) => {
|
||||
trace!(target: "sync", "{}: Processing state chunk", peer_id);
|
||||
io.snapshot_service().restore_state_chunk(hash, snapshot_data);
|
||||
}
|
||||
Err(()) => {
|
||||
trace!(target: "sync", "{}: Got bad snapshot chunk", peer_id);
|
||||
io.disconnect_peer(peer_id);
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
if self.snapshot.is_complete() {
|
||||
// wait for snapshot restoration process to complete
|
||||
self.state = SyncState::SnapshotWaiting;
|
||||
}
|
||||
// give a task to the same peer first.
|
||||
self.sync_peer(io, peer_id, false);
|
||||
// give tasks to other peers
|
||||
self.continue_sync(io);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called by peer when it is disconnecting
|
||||
pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer));
|
||||
@@ -764,7 +898,7 @@ impl ChainSync {
|
||||
/// Called when a new peer is connected
|
||||
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
|
||||
trace!(target: "sync", "== Connected {}: {}", peer, io.peer_info(peer));
|
||||
if let Err(e) = self.send_status(io) {
|
||||
if let Err(e) = self.send_status(io, peer) {
|
||||
debug!(target:"sync", "Error sending status request: {:?}", e);
|
||||
io.disable_peer(peer);
|
||||
}
|
||||
@@ -772,24 +906,27 @@ impl ChainSync {
|
||||
|
||||
/// Resume downloading
|
||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().filter_map(|(k, p)|
|
||||
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero))) } else { None }).collect();
|
||||
let mut peers: Vec<(PeerId, U256, u32)> = self.peers.iter().filter_map(|(k, p)|
|
||||
if p.can_sync() { Some((*k, p.difficulty.unwrap_or_else(U256::zero), p.protocol_version)) } else { None }).collect();
|
||||
thread_rng().shuffle(&mut peers); //TODO: sort by rating
|
||||
// prefer peers with higher protocol version
|
||||
peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2));
|
||||
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||
for (p, _) in peers {
|
||||
for (p, _, _) in peers {
|
||||
if self.active_peers.contains(&p) {
|
||||
self.sync_peer(io, p, false);
|
||||
}
|
||||
}
|
||||
if self.state != SyncState::Waiting && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||
self.complete_sync();
|
||||
if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting
|
||||
&& !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.can_sync()) {
|
||||
self.complete_sync(io);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called after all blocks have been downloaded
|
||||
fn complete_sync(&mut self) {
|
||||
fn complete_sync(&mut self, io: &mut SyncIo) {
|
||||
trace!(target: "sync", "Sync complete");
|
||||
self.reset();
|
||||
self.reset(io);
|
||||
self.state = SyncState::Idle;
|
||||
}
|
||||
|
||||
@@ -805,7 +942,7 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Skipping deactivated peer");
|
||||
return;
|
||||
}
|
||||
let (peer_latest, peer_difficulty) = {
|
||||
let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
if peer.asking != PeerAsking::Nothing || !peer.can_sync() {
|
||||
return;
|
||||
@@ -814,7 +951,11 @@ impl ChainSync {
|
||||
trace!(target: "sync", "Waiting for the block queue");
|
||||
return;
|
||||
}
|
||||
(peer.latest_hash.clone(), peer.difficulty.clone())
|
||||
if self.state == SyncState::SnapshotWaiting {
|
||||
trace!(target: "sync", "Waiting for the snapshot restoration");
|
||||
return;
|
||||
}
|
||||
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned(), peer.snapshot_hash.as_ref().cloned())
|
||||
};
|
||||
let chain_info = io.chain().chain_info();
|
||||
let td = chain_info.pending_total_difficulty;
|
||||
@@ -823,13 +964,18 @@ impl ChainSync {
|
||||
if force || self.state == SyncState::NewBlocks || peer_difficulty.map_or(true, |pd| pd > syncing_difficulty) {
|
||||
match self.state {
|
||||
SyncState::Idle => {
|
||||
if self.last_imported_block < chain_info.best_block_number {
|
||||
self.last_imported_block = chain_info.best_block_number;
|
||||
self.last_imported_hash = chain_info.best_block_hash;
|
||||
// check if we can start snapshot sync with this peer
|
||||
if peer_snapshot_number.unwrap_or(0) > 0 && chain_info.best_block_number == 0 {
|
||||
self.start_snapshot_sync(io, peer_id);
|
||||
} else {
|
||||
if self.last_imported_block < chain_info.best_block_number {
|
||||
self.last_imported_block = chain_info.best_block_number;
|
||||
self.last_imported_hash = chain_info.best_block_hash;
|
||||
}
|
||||
trace!(target: "sync", "Starting sync with {}", peer_id);
|
||||
self.start_sync_round(io);
|
||||
self.sync_peer(io, peer_id, force);
|
||||
}
|
||||
trace!(target: "sync", "Starting sync with {}", peer_id);
|
||||
self.start_sync_round(io);
|
||||
self.sync_peer(io, peer_id, force);
|
||||
},
|
||||
SyncState::ChainHead => {
|
||||
// Request subchain headers
|
||||
@@ -843,8 +989,14 @@ impl ChainSync {
|
||||
if io.chain().block_status(BlockID::Hash(peer_latest)) == BlockStatus::Unknown {
|
||||
self.request_blocks(io, peer_id, false);
|
||||
}
|
||||
}
|
||||
SyncState::Waiting => ()
|
||||
},
|
||||
SyncState::SnapshotData => {
|
||||
if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
|
||||
self.request_snapshot_data(io, peer_id);
|
||||
}
|
||||
},
|
||||
SyncState::SnapshotManifest => (), //already downloading from other peer
|
||||
SyncState::Waiting | SyncState::SnapshotWaiting => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -903,6 +1055,16 @@ impl ChainSync {
|
||||
}
|
||||
}
|
||||
|
||||
/// Find some headers or blocks to download for a peer.
|
||||
fn request_snapshot_data(&mut self, io: &mut SyncIo, peer_id: PeerId) {
|
||||
self.clear_peer_download(peer_id);
|
||||
// find chunk data to download
|
||||
if let Some(hash) = self.snapshot.needed_chunk() {
|
||||
self.peers.get_mut(&peer_id).unwrap().asking_snapshot_data = Some(hash.clone());
|
||||
self.request_snapshot_chunk(io, peer_id, &hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear all blocks/headers marked as being downloaded by a peer.
|
||||
fn clear_peer_download(&mut self, peer_id: PeerId) {
|
||||
let peer = self.peers.get_mut(&peer_id).unwrap();
|
||||
@@ -917,9 +1079,15 @@ impl ChainSync {
|
||||
self.blocks.clear_body_download(b);
|
||||
}
|
||||
},
|
||||
PeerAsking::SnapshotData => {
|
||||
if let Some(hash) = peer.asking_snapshot_data {
|
||||
self.snapshot.clear_chunk_download(&hash);
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
peer.asking_blocks.clear();
|
||||
peer.asking_snapshot_data = None;
|
||||
}
|
||||
|
||||
fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) {
|
||||
@@ -1016,6 +1184,22 @@ impl ChainSync {
|
||||
rlp.append(&if reverse {1u32} else {0u32});
|
||||
self.send_request(sync, peer_id, asking, GET_BLOCK_HEADERS_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request snapshot manifest from a peer.
|
||||
fn request_snapshot_manifest(&mut self, sync: &mut SyncIo, peer_id: PeerId) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotManifest", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
self.send_request(sync, peer_id, PeerAsking::SnapshotManifest, GET_SNAPSHOT_MANIFEST_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request snapshot chunk from a peer.
|
||||
fn request_snapshot_chunk(&mut self, sync: &mut SyncIo, peer_id: PeerId, chunk: &H256) {
|
||||
trace!(target: "sync", "{} <- GetSnapshotData {:?}", peer_id, chunk);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(chunk);
|
||||
self.send_request(sync, peer_id, PeerAsking::SnapshotData, GET_SNAPSHOT_DATA_PACKET, rlp.out());
|
||||
}
|
||||
|
||||
/// Request block bodies from a peer
|
||||
fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: PeerId, hashes: Vec<H256>) {
|
||||
let mut rlp = RlpStream::new_list(hashes.len());
|
||||
@@ -1086,14 +1270,22 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
/// Send Status message
|
||||
fn send_status(&mut self, io: &mut SyncIo) -> Result<(), NetworkError> {
|
||||
let mut packet = RlpStream::new_list(5);
|
||||
fn send_status(&mut self, io: &mut SyncIo, peer: PeerId) -> Result<(), NetworkError> {
|
||||
let pv64 = io.eth_protocol_version(peer) >= 64;
|
||||
let mut packet = RlpStream::new_list(if pv64 { 7 } else { 5 });
|
||||
let chain = io.chain().chain_info();
|
||||
packet.append(&(PROTOCOL_VERSION as u32));
|
||||
packet.append(&self.network_id);
|
||||
packet.append(&chain.total_difficulty);
|
||||
packet.append(&chain.best_block_hash);
|
||||
packet.append(&chain.genesis_hash);
|
||||
if pv64 {
|
||||
let manifest = io.snapshot_service().manifest();
|
||||
let block_number = manifest.as_ref().map_or(0, |m| m.block_number);
|
||||
let manifest_hash = manifest.map_or(H256::new(), |m| m.into_rlp().sha3());
|
||||
packet.append(&manifest_hash);
|
||||
packet.append(&block_number);
|
||||
}
|
||||
io.respond(STATUS_PACKET, packet.out())
|
||||
}
|
||||
|
||||
@@ -1230,6 +1422,48 @@ impl ChainSync {
|
||||
Ok(Some((RECEIPTS_PACKET, rlp_result)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_manifest(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let count = r.item_count();
|
||||
trace!(target: "sync", "{} -> GetSnapshotManifest", peer_id);
|
||||
if count != 0 {
|
||||
debug!(target: "sync", "Invalid GetSnapshotManifest request, ignoring.");
|
||||
return Ok(None);
|
||||
}
|
||||
let rlp = match io.snapshot_service().manifest() {
|
||||
Some(manifest) => {
|
||||
trace!(target: "sync", "{} <- SnapshotManifest", peer_id);
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append_raw(&manifest.into_rlp(), 1);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
trace!(target: "sync", "{}: No manifest to return", peer_id);
|
||||
let rlp = RlpStream::new_list(0);
|
||||
rlp
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_MANIFEST_PACKET, rlp)))
|
||||
}
|
||||
|
||||
/// Respond to GetSnapshotManifest request
|
||||
fn return_snapshot_data(io: &SyncIo, r: &UntrustedRlp, peer_id: PeerId) -> RlpResponseResult {
|
||||
let hash: H256 = try!(r.val_at(0));
|
||||
trace!(target: "sync", "{} -> GetSnapshotData {:?}", peer_id, hash);
|
||||
let rlp = match io.snapshot_service().chunk(hash) {
|
||||
Some(data) => {
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
rlp.append(&data);
|
||||
rlp
|
||||
},
|
||||
None => {
|
||||
let rlp = RlpStream::new_list(0);
|
||||
rlp
|
||||
}
|
||||
};
|
||||
Ok(Some((SNAPSHOT_DATA_PACKET, rlp)))
|
||||
}
|
||||
|
||||
fn return_rlp<FRlp, FError>(io: &mut SyncIo, rlp: &UntrustedRlp, peer: PeerId, rlp_func: FRlp, error_func: FError) -> Result<(), PacketDecodeError>
|
||||
where FRlp : Fn(&SyncIo, &UntrustedRlp, PeerId) -> RlpResponseResult,
|
||||
FError : FnOnce(NetworkError) -> String
|
||||
@@ -1266,6 +1500,14 @@ impl ChainSync {
|
||||
ChainSync::return_node_data,
|
||||
|e| format!("Error sending nodes: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_MANIFEST_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_snapshot_manifest,
|
||||
|e| format!("Error sending snapshot manifest: {:?}", e)),
|
||||
|
||||
GET_SNAPSHOT_DATA_PACKET => ChainSync::return_rlp(io, &rlp, peer,
|
||||
ChainSync::return_snapshot_data,
|
||||
|e| format!("Error sending snapshot data: {:?}", e)),
|
||||
|
||||
_ => {
|
||||
sync.write().on_packet(io, peer, packet_id, data);
|
||||
Ok(())
|
||||
@@ -1289,6 +1531,8 @@ impl ChainSync {
|
||||
BLOCK_BODIES_PACKET => self.on_peer_block_bodies(io, peer, &rlp),
|
||||
NEW_BLOCK_PACKET => self.on_peer_new_block(io, peer, &rlp),
|
||||
NEW_BLOCK_HASHES_PACKET => self.on_peer_new_hashes(io, peer, &rlp),
|
||||
SNAPSHOT_MANIFEST_PACKET => self.on_snapshot_manifest(io, peer, &rlp),
|
||||
SNAPSHOT_DATA_PACKET => self.on_snapshot_data(io, peer, &rlp),
|
||||
_ => {
|
||||
debug!(target: "sync", "Unknown packet {}", packet_id);
|
||||
Ok(())
|
||||
@@ -1308,6 +1552,8 @@ impl ChainSync {
|
||||
PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC,
|
||||
PeerAsking::Nothing => false,
|
||||
PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC,
|
||||
PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC,
|
||||
PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC,
|
||||
};
|
||||
if timeout {
|
||||
trace!(target:"sync", "Timeout {}", peer_id);
|
||||
@@ -1321,9 +1567,12 @@ impl ChainSync {
|
||||
}
|
||||
|
||||
fn check_resume(&mut self, io: &mut SyncIo) {
|
||||
if !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
|
||||
if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting {
|
||||
self.state = SyncState::Blocks;
|
||||
self.continue_sync(io);
|
||||
} else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive {
|
||||
self.state = SyncState::Idle;
|
||||
self.continue_sync(io);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1559,6 +1808,7 @@ impl ChainSync {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tests::helpers::*;
|
||||
use tests::snapshot::TestSnapshotService;
|
||||
use super::*;
|
||||
use ::SyncConfig;
|
||||
use util::*;
|
||||
@@ -1612,7 +1862,8 @@ mod tests {
|
||||
fn return_receipts_empty() {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let result = ChainSync::return_receipts(&io, &UntrustedRlp::new(&[0xc0]), 0);
|
||||
|
||||
@@ -1624,7 +1875,8 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let mut receipt_list = RlpStream::new_list(4);
|
||||
receipt_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
@@ -1679,7 +1931,8 @@ mod tests {
|
||||
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let unknown: H256 = H256::new();
|
||||
let result = ChainSync::return_block_headers(&io, &UntrustedRlp::new(&make_hash_req(&unknown, 1, 0, false)), 0);
|
||||
@@ -1717,7 +1970,8 @@ mod tests {
|
||||
let mut client = TestBlockChainClient::new();
|
||||
let mut queue = VecDeque::new();
|
||||
let sync = dummy_sync_with_peer(H256::new(), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let mut node_list = RlpStream::new_list(3);
|
||||
node_list.append(&H256::from("0000000000000000000000000000000000000000000000005555555555555555"));
|
||||
@@ -1758,6 +2012,9 @@ mod tests {
|
||||
last_sent_transactions: HashSet::new(),
|
||||
expired: false,
|
||||
confirmation: super::ForkConfirmation::Confirmed,
|
||||
snapshot_number: None,
|
||||
snapshot_hash: None,
|
||||
asking_snapshot_data: None,
|
||||
});
|
||||
sync
|
||||
}
|
||||
@@ -1769,7 +2026,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let lagging_peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
|
||||
@@ -1800,7 +2058,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
@@ -1820,7 +2079,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
|
||||
@@ -1840,7 +2100,8 @@ mod tests {
|
||||
let hash = client.block_hash(BlockID::Number(99)).unwrap();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()], &peers);
|
||||
|
||||
@@ -1859,7 +2120,8 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
// Try to propagate same transactions for the second time
|
||||
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
||||
@@ -1880,7 +2142,8 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &[], &[]);
|
||||
// Try to propagate same transactions for the second time
|
||||
@@ -1903,17 +2166,17 @@ mod tests {
|
||||
client.insert_transaction_to_queue();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(1), &client);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut ss = TestSnapshotService::new();
|
||||
// should sent some
|
||||
{
|
||||
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
let peer_count = sync.propagate_new_transactions(&mut io);
|
||||
assert_eq!(1, io.queue.len());
|
||||
assert_eq!(1, peer_count);
|
||||
}
|
||||
// Insert some more
|
||||
client.insert_transaction_to_queue();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
// Propagate new transactions
|
||||
let peer_count2 = sync.propagate_new_transactions(&mut io);
|
||||
// And now the peer should have all transactions
|
||||
@@ -1939,7 +2202,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
//sync.have_common_block = true;
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let block = UntrustedRlp::new(&block_data);
|
||||
|
||||
@@ -1957,7 +2221,8 @@ mod tests {
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let block = UntrustedRlp::new(&block_data);
|
||||
|
||||
@@ -1972,7 +2237,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let empty_data = vec![];
|
||||
let block = UntrustedRlp::new(&empty_data);
|
||||
@@ -1988,7 +2254,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let hashes_data = get_dummy_hashes();
|
||||
let hashes_rlp = UntrustedRlp::new(&hashes_data);
|
||||
@@ -2004,7 +2271,8 @@ mod tests {
|
||||
client.add_blocks(10, EachBlockWith::Uncle);
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let empty_hashes_data = vec![];
|
||||
let hashes_rlp = UntrustedRlp::new(&empty_hashes_data);
|
||||
@@ -2023,7 +2291,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
sync.propagate_new_hashes(&chain_info, &mut io, &peers);
|
||||
@@ -2042,7 +2311,8 @@ mod tests {
|
||||
let mut queue = VecDeque::new();
|
||||
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client);
|
||||
let chain_info = client.chain_info();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
let peers = sync.get_lagging_peers(&chain_info, &io);
|
||||
sync.propagate_blocks(&chain_info, &mut io, &[], &peers);
|
||||
@@ -2076,7 +2346,8 @@ mod tests {
|
||||
// when
|
||||
{
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
|
||||
@@ -2090,7 +2361,8 @@ mod tests {
|
||||
}
|
||||
{
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]);
|
||||
}
|
||||
@@ -2114,7 +2386,8 @@ mod tests {
|
||||
let retracted_blocks = vec![client.block_hash_delta_minus(1)];
|
||||
|
||||
let mut queue = VecDeque::new();
|
||||
let mut io = TestIo::new(&mut client, &mut queue, None);
|
||||
let mut ss = TestSnapshotService::new();
|
||||
let mut io = TestIo::new(&mut client, &mut ss, &mut queue, None);
|
||||
|
||||
// when
|
||||
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
use util::*;
|
||||
use network::*;
|
||||
use rlp::{UntrustedRlp, DecoderError, RlpStream, View, Stream};
|
||||
use ethcore::client::{BlockChainClient};
|
||||
use sync_io::SyncIo;
|
||||
use super::SyncConfig;
|
||||
|
||||
@@ -26,40 +26,6 @@
|
||||
//! Implements ethereum protocol version 63 as specified here:
|
||||
//! https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
|
||||
//!
|
||||
//! Usage example:
|
||||
//!
|
||||
//! ```rust
|
||||
//! extern crate ethcore_util as util;
|
||||
//! extern crate ethcore_io as io;
|
||||
//! extern crate ethcore;
|
||||
//! extern crate ethsync;
|
||||
//! use std::env;
|
||||
//! use io::IoChannel;
|
||||
//! use ethcore::client::{Client, ClientConfig};
|
||||
//! use ethsync::{EthSync, SyncConfig, ManageNetwork, NetworkConfiguration};
|
||||
//! use ethcore::ethereum;
|
||||
//! use ethcore::miner::{GasPricer, Miner};
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let dir = env::temp_dir();
|
||||
//! let spec = ethereum::new_frontier();
|
||||
//! let miner = Miner::new(
|
||||
//! Default::default(),
|
||||
//! GasPricer::new_fixed(20_000_000_000u64.into()),
|
||||
//! &spec,
|
||||
//! None
|
||||
//! );
|
||||
//! let client = Client::new(
|
||||
//! ClientConfig::default(),
|
||||
//! &spec,
|
||||
//! &dir,
|
||||
//! miner,
|
||||
//! IoChannel::disconnected()
|
||||
//! ).unwrap();
|
||||
//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::from(NetworkConfiguration::new())).unwrap();
|
||||
//! sync.start_network();
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
extern crate ethcore_network as network;
|
||||
extern crate ethcore_io as io;
|
||||
@@ -84,6 +50,7 @@ mod chain;
|
||||
mod blocks;
|
||||
mod sync_io;
|
||||
mod infinity;
|
||||
mod snapshot;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -97,4 +64,3 @@ pub use api::{EthSync, SyncProvider, SyncClient, NetworkManagerClient, ManageNet
|
||||
ServiceConfiguration, NetworkConfiguration};
|
||||
pub use chain::{SyncStatus, SyncState};
|
||||
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};
|
||||
|
||||
|
||||
200
sync/src/snapshot.rs
Normal file
200
sync/src/snapshot.rs
Normal file
@@ -0,0 +1,200 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
use util::{H256, Hashable};
|
||||
use std::collections::HashSet;
|
||||
use ethcore::snapshot::ManifestData;
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum ChunkType {
|
||||
State(H256),
|
||||
Block(H256),
|
||||
}
|
||||
|
||||
pub struct Snapshot {
|
||||
pending_state_chunks: Vec<H256>,
|
||||
pending_block_chunks: Vec<H256>,
|
||||
downloading_chunks: HashSet<H256>,
|
||||
completed_chunks: HashSet<H256>,
|
||||
snapshot_hash: Option<H256>,
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
/// Create a new instance.
|
||||
pub fn new() -> Snapshot {
|
||||
Snapshot {
|
||||
pending_state_chunks: Vec::new(),
|
||||
pending_block_chunks: Vec::new(),
|
||||
downloading_chunks: HashSet::new(),
|
||||
completed_chunks: HashSet::new(),
|
||||
snapshot_hash: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear everything.
|
||||
pub fn clear(&mut self) {
|
||||
self.pending_state_chunks.clear();
|
||||
self.pending_block_chunks.clear();
|
||||
self.downloading_chunks.clear();
|
||||
self.completed_chunks.clear();
|
||||
self.snapshot_hash = None;
|
||||
}
|
||||
|
||||
/// Reset collection for a manifest RLP
|
||||
pub fn reset_to(&mut self, manifest: &ManifestData, hash: &H256) {
|
||||
self.clear();
|
||||
self.pending_state_chunks = manifest.state_hashes.clone();
|
||||
self.pending_block_chunks = manifest.block_hashes.clone();
|
||||
self.snapshot_hash = Some(hash.clone());
|
||||
}
|
||||
|
||||
/// Validate chunk and mark it as downloaded
|
||||
pub fn validate_chunk(&mut self, chunk: &[u8]) -> Result<ChunkType, ()> {
|
||||
let hash = chunk.sha3();
|
||||
if self.completed_chunks.contains(&hash) {
|
||||
trace!(target: "sync", "Ignored proccessed chunk: {}", hash.hex());
|
||||
return Err(());
|
||||
}
|
||||
self.downloading_chunks.remove(&hash);
|
||||
if self.pending_block_chunks.iter().any(|h| h == &hash) {
|
||||
self.completed_chunks.insert(hash.clone());
|
||||
return Ok(ChunkType::Block(hash));
|
||||
}
|
||||
if self.pending_state_chunks.iter().any(|h| h == &hash) {
|
||||
self.completed_chunks.insert(hash.clone());
|
||||
return Ok(ChunkType::State(hash));
|
||||
}
|
||||
trace!(target: "sync", "Ignored unknown chunk: {}", hash.hex());
|
||||
Err(())
|
||||
}
|
||||
|
||||
/// Find a chunk to download
|
||||
pub fn needed_chunk(&mut self) -> Option<H256> {
|
||||
// check state chunks first
|
||||
let mut chunk = self.pending_state_chunks.iter()
|
||||
.find(|&h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h))
|
||||
.cloned();
|
||||
if chunk.is_none() {
|
||||
chunk = self.pending_block_chunks.iter()
|
||||
.find(|&h| !self.downloading_chunks.contains(h) && !self.completed_chunks.contains(h))
|
||||
.cloned();
|
||||
}
|
||||
|
||||
if let Some(hash) = chunk {
|
||||
self.downloading_chunks.insert(hash.clone());
|
||||
}
|
||||
chunk
|
||||
}
|
||||
|
||||
pub fn clear_chunk_download(&mut self, hash: &H256) {
|
||||
self.downloading_chunks.remove(hash);
|
||||
}
|
||||
|
||||
pub fn snapshot_hash(&self) -> Option<H256> {
|
||||
self.snapshot_hash
|
||||
}
|
||||
|
||||
pub fn total_chunks(&self) -> usize {
|
||||
self.pending_block_chunks.len() + self.pending_state_chunks.len()
|
||||
}
|
||||
|
||||
pub fn done_chunks(&self) -> usize {
|
||||
self.total_chunks() - self.completed_chunks.len()
|
||||
}
|
||||
|
||||
pub fn is_complete(&self) -> bool {
|
||||
self.total_chunks() == self.completed_chunks.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use util::*;
|
||||
use super::*;
|
||||
use ethcore::snapshot::ManifestData;
|
||||
|
||||
fn is_empty(snapshot: &Snapshot) -> bool {
|
||||
snapshot.pending_block_chunks.is_empty() &&
|
||||
snapshot.pending_state_chunks.is_empty() &&
|
||||
snapshot.completed_chunks.is_empty() &&
|
||||
snapshot.downloading_chunks.is_empty() &&
|
||||
snapshot.snapshot_hash.is_none()
|
||||
}
|
||||
|
||||
fn test_manifest() -> (ManifestData, H256, Vec<Bytes>, Vec<Bytes>) {
|
||||
let state_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
|
||||
let block_chunks: Vec<Bytes> = (0..20).map(|_| H256::random().to_vec()).collect();
|
||||
let manifest = ManifestData {
|
||||
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
state_root: H256::new(),
|
||||
block_number: 42,
|
||||
block_hash: H256::new(),
|
||||
};
|
||||
let mhash = manifest.clone().into_rlp().sha3();
|
||||
(manifest, mhash, state_chunks, block_chunks)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_clear() {
|
||||
let mut snapshot = Snapshot::new();
|
||||
assert!(is_empty(&snapshot));
|
||||
let (manifest, mhash, _, _,) = test_manifest();
|
||||
snapshot.reset_to(&manifest, &mhash);
|
||||
assert!(!is_empty(&snapshot));
|
||||
snapshot.clear();
|
||||
assert!(is_empty(&snapshot));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_chunks() {
|
||||
let mut snapshot = Snapshot::new();
|
||||
let (manifest, mhash, state_chunks, block_chunks) = test_manifest();
|
||||
snapshot.reset_to(&manifest, &mhash);
|
||||
assert!(snapshot.validate_chunk(&H256::random().to_vec()).is_err());
|
||||
|
||||
let requested: Vec<H256> = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect();
|
||||
assert!(snapshot.needed_chunk().is_none());
|
||||
assert_eq!(&requested[0..20], &manifest.state_hashes[..]);
|
||||
assert_eq!(&requested[20..40], &manifest.block_hashes[..]);
|
||||
assert_eq!(snapshot.downloading_chunks.len(), 40);
|
||||
|
||||
assert_eq!(snapshot.validate_chunk(&state_chunks[4]), Ok(ChunkType::State(manifest.state_hashes[4].clone())));
|
||||
assert_eq!(snapshot.completed_chunks.len(), 1);
|
||||
assert_eq!(snapshot.downloading_chunks.len(), 39);
|
||||
|
||||
assert_eq!(snapshot.validate_chunk(&block_chunks[10]), Ok(ChunkType::Block(manifest.block_hashes[10].clone())));
|
||||
assert_eq!(snapshot.completed_chunks.len(), 2);
|
||||
assert_eq!(snapshot.downloading_chunks.len(), 38);
|
||||
|
||||
for (i, data) in state_chunks.iter().enumerate() {
|
||||
if i != 4 {
|
||||
assert!(snapshot.validate_chunk(data).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
for (i, data) in block_chunks.iter().enumerate() {
|
||||
if i != 10 {
|
||||
assert!(snapshot.validate_chunk(data).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
assert!(snapshot.is_complete());
|
||||
assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
use network::{NetworkContext, PeerId, PacketId, NetworkError};
|
||||
use ethcore::client::BlockChainClient;
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use api::ETH_PROTOCOL;
|
||||
|
||||
/// IO interface for the syning handler.
|
||||
/// Provides peer connection management and an interface to the blockchain client.
|
||||
@@ -31,10 +33,14 @@ pub trait SyncIo {
|
||||
fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), NetworkError>;
|
||||
/// Get the blockchain
|
||||
fn chain(&self) -> &BlockChainClient;
|
||||
/// Get the snapshot service.
|
||||
fn snapshot_service(&self) -> &SnapshotService;
|
||||
/// Returns peer client identifier string
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
peer_id.to_string()
|
||||
}
|
||||
/// Maximum mutuallt supported ETH protocol version
|
||||
fn eth_protocol_version(&self, peer_id: PeerId) -> u8;
|
||||
/// Returns if the chain block queue empty
|
||||
fn is_chain_queue_empty(&self) -> bool {
|
||||
self.chain().queue_info().is_empty()
|
||||
@@ -46,15 +52,17 @@ pub trait SyncIo {
|
||||
/// Wraps `NetworkContext` and the blockchain client
|
||||
pub struct NetSyncIo<'s, 'h> where 'h: 's {
|
||||
network: &'s NetworkContext<'h>,
|
||||
chain: &'s BlockChainClient
|
||||
chain: &'s BlockChainClient,
|
||||
snapshot_service: &'s SnapshotService,
|
||||
}
|
||||
|
||||
impl<'s, 'h> NetSyncIo<'s, 'h> {
|
||||
/// Creates a new instance from the `NetworkContext` and the blockchain client reference.
|
||||
pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient) -> NetSyncIo<'s, 'h> {
|
||||
pub fn new(network: &'s NetworkContext<'h>, chain: &'s BlockChainClient, snapshot_service: &'s SnapshotService) -> NetSyncIo<'s, 'h> {
|
||||
NetSyncIo {
|
||||
network: network,
|
||||
chain: chain,
|
||||
snapshot_service: snapshot_service,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -80,6 +88,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
||||
self.chain
|
||||
}
|
||||
|
||||
fn snapshot_service(&self) -> &SnapshotService {
|
||||
self.snapshot_service
|
||||
}
|
||||
|
||||
fn peer_info(&self, peer_id: PeerId) -> String {
|
||||
self.network.peer_info(peer_id)
|
||||
}
|
||||
@@ -87,6 +99,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
|
||||
fn is_expired(&self) -> bool {
|
||||
self.network.is_expired()
|
||||
}
|
||||
|
||||
fn eth_protocol_version(&self, peer_id: PeerId) -> u8 {
|
||||
self.network.protocol_version(peer_id, ETH_PROTOCOL).unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -16,22 +16,26 @@
|
||||
|
||||
use util::*;
|
||||
use network::*;
|
||||
use tests::snapshot::*;
|
||||
use ethcore::client::{TestBlockChainClient, BlockChainClient};
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethcore::snapshot::SnapshotService;
|
||||
use sync_io::SyncIo;
|
||||
use chain::ChainSync;
|
||||
use ::SyncConfig;
|
||||
|
||||
pub struct TestIo<'p> {
|
||||
pub chain: &'p mut TestBlockChainClient,
|
||||
pub snapshot_service: &'p TestSnapshotService,
|
||||
pub queue: &'p mut VecDeque<TestPacket>,
|
||||
pub sender: Option<PeerId>,
|
||||
}
|
||||
|
||||
impl<'p> TestIo<'p> {
|
||||
pub fn new(chain: &'p mut TestBlockChainClient, queue: &'p mut VecDeque<TestPacket>, sender: Option<PeerId>) -> TestIo<'p> {
|
||||
pub fn new(chain: &'p mut TestBlockChainClient, ss: &'p TestSnapshotService, queue: &'p mut VecDeque<TestPacket>, sender: Option<PeerId>) -> TestIo<'p> {
|
||||
TestIo {
|
||||
chain: chain,
|
||||
snapshot_service: ss,
|
||||
queue: queue,
|
||||
sender: sender
|
||||
}
|
||||
@@ -70,6 +74,14 @@ impl<'p> SyncIo for TestIo<'p> {
|
||||
fn chain(&self) -> &BlockChainClient {
|
||||
self.chain
|
||||
}
|
||||
|
||||
fn snapshot_service(&self) -> &SnapshotService {
|
||||
self.snapshot_service
|
||||
}
|
||||
|
||||
fn eth_protocol_version(&self, _peer: PeerId) -> u8 {
|
||||
64
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestPacket {
|
||||
@@ -80,6 +92,7 @@ pub struct TestPacket {
|
||||
|
||||
pub struct TestPeer {
|
||||
pub chain: TestBlockChainClient,
|
||||
pub snapshot_service: Arc<TestSnapshotService>,
|
||||
pub sync: RwLock<ChainSync>,
|
||||
pub queue: VecDeque<TestPacket>,
|
||||
}
|
||||
@@ -103,9 +116,11 @@ impl TestNet {
|
||||
let chain = TestBlockChainClient::new();
|
||||
let mut config = SyncConfig::default();
|
||||
config.fork_block = fork;
|
||||
let ss = Arc::new(TestSnapshotService::new());
|
||||
let sync = ChainSync::new(config, &chain);
|
||||
net.peers.push(TestPeer {
|
||||
sync: RwLock::new(sync),
|
||||
snapshot_service: ss,
|
||||
chain: chain,
|
||||
queue: VecDeque::new(),
|
||||
});
|
||||
@@ -126,7 +141,7 @@ impl TestNet {
|
||||
for client in 0..self.peers.len() {
|
||||
if peer != client {
|
||||
let mut p = self.peers.get_mut(peer).unwrap();
|
||||
p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &mut p.queue, Some(client as PeerId)), client as PeerId);
|
||||
p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)), client as PeerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,22 +152,22 @@ impl TestNet {
|
||||
if let Some(packet) = self.peers[peer].queue.pop_front() {
|
||||
let mut p = self.peers.get_mut(packet.recipient).unwrap();
|
||||
trace!("--- {} -> {} ---", peer, packet.recipient);
|
||||
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
|
||||
ChainSync::dispatch_packet(&p.sync, &mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(peer as PeerId)), peer as PeerId, packet.packet_id, &packet.data);
|
||||
trace!("----------------");
|
||||
}
|
||||
let mut p = self.peers.get_mut(peer).unwrap();
|
||||
p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &mut p.queue, None));
|
||||
p.sync.write().maintain_sync(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, None));
|
||||
}
|
||||
}
|
||||
|
||||
pub fn sync_step_peer(&mut self, peer_num: usize) {
|
||||
let mut peer = self.peer_mut(peer_num);
|
||||
peer.sync.write().maintain_sync(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
peer.sync.write().maintain_sync(&mut TestIo::new(&mut peer.chain, &peer.snapshot_service, &mut peer.queue, None));
|
||||
}
|
||||
|
||||
pub fn restart_peer(&mut self, i: usize) {
|
||||
let peer = self.peer_mut(i);
|
||||
peer.sync.write().restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None));
|
||||
peer.sync.write().restart(&mut TestIo::new(&mut peer.chain, &peer.snapshot_service, &mut peer.queue, None));
|
||||
}
|
||||
|
||||
pub fn sync(&mut self) -> u32 {
|
||||
@@ -181,6 +196,6 @@ impl TestNet {
|
||||
|
||||
pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
|
||||
let mut peer = self.peer_mut(peer_id);
|
||||
peer.sync.write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]);
|
||||
peer.sync.write().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &peer.snapshot_service, &mut peer.queue, None), &[], &[], &[], &[], &[]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,5 +15,6 @@
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
pub mod helpers;
|
||||
pub mod snapshot;
|
||||
mod chain;
|
||||
mod rpc;
|
||||
|
||||
123
sync/src/tests/snapshot.rs
Normal file
123
sync/src/tests/snapshot.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use util::*;
|
||||
use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus};
|
||||
use ethcore::header::BlockNumber;
|
||||
use ethcore::client::{EachBlockWith};
|
||||
use super::helpers::*;
|
||||
|
||||
pub struct TestSnapshotService {
|
||||
manifest: Option<ManifestData>,
|
||||
chunks: HashMap<H256, Bytes>,
|
||||
|
||||
restoration_manifest: Mutex<Option<ManifestData>>,
|
||||
state_restoration_chunks: Mutex<HashMap<H256, Bytes>>,
|
||||
block_restoration_chunks: Mutex<HashMap<H256, Bytes>>,
|
||||
}
|
||||
|
||||
impl TestSnapshotService {
|
||||
pub fn new() -> TestSnapshotService {
|
||||
TestSnapshotService {
|
||||
manifest: None,
|
||||
chunks: HashMap::new(),
|
||||
restoration_manifest: Mutex::new(None),
|
||||
state_restoration_chunks: Mutex::new(HashMap::new()),
|
||||
block_restoration_chunks: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_snapshot(num_chunks: usize, block_hash: H256, block_number: BlockNumber) -> TestSnapshotService {
|
||||
let num_state_chunks = num_chunks / 2;
|
||||
let num_block_chunks = num_chunks - num_state_chunks;
|
||||
let state_chunks: Vec<Bytes> = (0..num_state_chunks).map(|_| H256::random().to_vec()).collect();
|
||||
let block_chunks: Vec<Bytes> = (0..num_block_chunks).map(|_| H256::random().to_vec()).collect();
|
||||
let manifest = ManifestData {
|
||||
state_hashes: state_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
block_hashes: block_chunks.iter().map(|data| data.sha3()).collect(),
|
||||
state_root: H256::new(),
|
||||
block_number: block_number,
|
||||
block_hash: block_hash,
|
||||
};
|
||||
let mut chunks: HashMap<H256, Bytes> = state_chunks.into_iter().map(|data| (data.sha3(), data)).collect();
|
||||
chunks.extend(block_chunks.into_iter().map(|data| (data.sha3(), data)));
|
||||
TestSnapshotService {
|
||||
manifest: Some(manifest),
|
||||
chunks: chunks,
|
||||
restoration_manifest: Mutex::new(None),
|
||||
state_restoration_chunks: Mutex::new(HashMap::new()),
|
||||
block_restoration_chunks: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SnapshotService for TestSnapshotService {
|
||||
fn manifest(&self) -> Option<ManifestData> {
|
||||
self.manifest.as_ref().cloned()
|
||||
}
|
||||
|
||||
fn chunk(&self, hash: H256) -> Option<Bytes> {
|
||||
self.chunks.get(&hash).cloned()
|
||||
}
|
||||
|
||||
fn status(&self) -> RestorationStatus {
|
||||
match &*self.restoration_manifest.lock() {
|
||||
&Some(ref manifest) if self.state_restoration_chunks.lock().len() == manifest.state_hashes.len() &&
|
||||
self.block_restoration_chunks.lock().len() == manifest.block_hashes.len() => RestorationStatus::Inactive,
|
||||
&Some(_) => RestorationStatus::Ongoing {
|
||||
state_chunks_done: self.state_restoration_chunks.lock().len() as u32,
|
||||
block_chunks_done: self.block_restoration_chunks.lock().len() as u32,
|
||||
},
|
||||
&None => RestorationStatus::Inactive,
|
||||
}
|
||||
}
|
||||
|
||||
fn begin_restore(&self, manifest: ManifestData) {
|
||||
*self.restoration_manifest.lock() = Some(manifest);
|
||||
self.state_restoration_chunks.lock().clear();
|
||||
self.block_restoration_chunks.lock().clear();
|
||||
}
|
||||
|
||||
fn abort_restore(&self) {
|
||||
*self.restoration_manifest.lock() = None;
|
||||
self.state_restoration_chunks.lock().clear();
|
||||
self.block_restoration_chunks.lock().clear();
|
||||
}
|
||||
|
||||
fn restore_state_chunk(&self, hash: H256, chunk: Bytes) {
|
||||
if self.restoration_manifest.lock().as_ref().map_or(false, |ref m| m.state_hashes.iter().any(|h| h == &hash)) {
|
||||
self.state_restoration_chunks.lock().insert(hash, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
fn restore_block_chunk(&self, hash: H256, chunk: Bytes) {
|
||||
if self.restoration_manifest.lock().as_ref().map_or(false, |ref m| m.block_hashes.iter().any(|h| h == &hash)) {
|
||||
self.block_restoration_chunks.lock().insert(hash, chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_sync() {
|
||||
::env_logger::init().ok();
|
||||
let mut net = TestNet::new(2);
|
||||
net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 1));
|
||||
net.peer_mut(0).chain.add_blocks(1, EachBlockWith::Nothing);
|
||||
net.sync_steps(19); // status + manifest + chunks
|
||||
assert_eq!(net.peer(1).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len());
|
||||
assert_eq!(net.peer(1).snapshot_service.block_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().block_hashes.len());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user