diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index b81dcda4e..691bdd614 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -393,6 +393,10 @@ usage! { "--no-serve-light", "Disable serving of light peers.", + ARG arg_warp_barrier: (Option) = None, or |c: &Config| c.network.as_ref()?.warp_barrier.clone(), + "--warp-barrier=[NUM]", + "When warp enabled never attempt regular sync before warping to block NUM.", + ARG arg_port: (u16) = 30303u16, or |c: &Config| c.network.as_ref()?.port.clone(), "--port=[PORT]", "Override the port on which the node should listen.", @@ -1044,6 +1048,7 @@ struct Ui { #[serde(deny_unknown_fields)] struct Network { warp: Option, + warp_barrier: Option, port: Option, min_peers: Option, max_peers: Option, @@ -1625,6 +1630,7 @@ mod tests { flag_geth: false, flag_testnet: false, flag_import_geth_keys: false, + arg_warp_barrier: None, arg_datadir: None, arg_networkid: None, arg_peers: None, @@ -1730,6 +1736,7 @@ mod tests { }), network: Some(Network { warp: Some(false), + warp_barrier: None, port: None, min_peers: Some(10), max_peers: Some(20), diff --git a/parity/configuration.rs b/parity/configuration.rs index 39c18f138..bff839039 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -368,6 +368,7 @@ impl Configuration { wal: wal, vm_type: vm_type, warp_sync: warp_sync, + warp_barrier: self.args.arg_warp_barrier, public_node: public_node, geth_compatibility: geth_compatibility, net_settings: self.network_settings()?, @@ -1401,6 +1402,7 @@ mod tests { network_id: None, public_node: false, warp_sync: true, + warp_barrier: None, acc_conf: Default::default(), gas_pricer_conf: Default::default(), miner_extras: Default::default(), diff --git a/parity/run.rs b/parity/run.rs index 19620f9d5..e7fb41cd7 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -101,6 +101,7 @@ pub struct RunCmd { pub net_conf: ethsync::NetworkConfiguration, pub network_id: Option, pub warp_sync: bool, + pub warp_barrier: Option, pub public_node: bool, pub acc_conf: AccountsConfig, pub gas_pricer_conf: GasPricerConfig, @@ -513,7 +514,7 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc) } sync_config.fork_block = spec.fork_block(); - let mut warp_sync = cmd.warp_sync; + let mut warp_sync = spec.engine.supports_warp() && cmd.warp_sync; if warp_sync { // Logging is not initialized yet, so we print directly to stderr if fat_db { @@ -527,7 +528,11 @@ pub fn execute_impl(cmd: RunCmd, can_restart: bool, logger: Arc) warp_sync = false; } } - sync_config.warp_sync = spec.engine.supports_warp() && warp_sync; + sync_config.warp_sync = match (warp_sync, cmd.warp_barrier) { + (true, Some(block)) => ethsync::WarpSync::OnlyAndAfter(block), + (true, _) => ethsync::WarpSync::Enabled, + _ => ethsync::WarpSync::Disabled, + }; sync_config.download_old_blocks = cmd.download_old_blocks; sync_config.serve_light = cmd.serve_light; diff --git a/sync/src/api.rs b/sync/src/api.rs index 38543de15..735b6eb03 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -45,6 +45,41 @@ pub const ETH_PROTOCOL: ProtocolId = *b"eth"; /// Ethereum light protocol pub const LIGHT_PROTOCOL: ProtocolId = *b"pip"; +/// Determine warp sync status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WarpSync { + /// Warp sync is enabled. + Enabled, + /// Warp sync is disabled. + Disabled, + /// Only warp sync is allowed (no regular sync) and only after given block number. + OnlyAndAfter(BlockNumber), +} + +impl WarpSync { + /// Returns true if warp sync is enabled. + pub fn is_enabled(&self) -> bool { + match *self { + WarpSync::Enabled => true, + WarpSync::OnlyAndAfter(_) => true, + WarpSync::Disabled => false, + } + } + + /// Returns `true` if we are in warp-only mode. + /// + /// i.e. we will never fall back to regular sync + /// until given block number is reached by + /// successfuly finding and restoring from a snapshot. + pub fn is_warp_only(&self) -> bool { + if let WarpSync::OnlyAndAfter(_) = *self { + true + } else { + false + } + } +} + /// Sync configuration #[derive(Debug, Clone, Copy)] pub struct SyncConfig { @@ -61,7 +96,7 @@ pub struct SyncConfig { /// Fork block to check pub fork_block: Option<(BlockNumber, H256)>, /// Enable snapshot sync - pub warp_sync: bool, + pub warp_sync: WarpSync, /// Enable light client server. pub serve_light: bool, } @@ -75,7 +110,7 @@ impl Default for SyncConfig { subprotocol_name: ETH_PROTOCOL, light_subprotocol_name: LIGHT_PROTOCOL, fork_block: None, - warp_sync: false, + warp_sync: WarpSync::Disabled, serve_light: false, } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6b5f370d3..305922d5a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -105,7 +105,7 @@ use ethcore::error::*; use ethcore::snapshot::{ManifestData, RestorationStatus}; use transaction::PendingTransaction; use sync_io::SyncIo; -use super::SyncConfig; +use super::{WarpSync, SyncConfig}; use block_sync::{BlockDownloader, BlockRequest, BlockDownloaderImportError as DownloaderImportError, DownloadAction}; use rand::Rng; use snapshot::{Snapshot, ChunkType}; @@ -385,7 +385,7 @@ pub struct ChainSync { /// Enable ancient block downloading download_old_blocks: bool, /// Enable warp sync. - enable_warp_sync: bool, + warp_sync: WarpSync, } type RlpResponseResult = Result, PacketDecodeError>; @@ -394,9 +394,16 @@ impl ChainSync { /// Create a new instance of syncing strategy. pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { let chain_info = chain.chain_info(); + let best_block = chain.chain_info().best_block_number; + let state = match config.warp_sync { + WarpSync::Enabled => SyncState::WaitingPeers, + WarpSync::OnlyAndAfter(block) if block > best_block => SyncState::WaitingPeers, + _ => SyncState::Idle, + }; + let mut sync = ChainSync { - state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, - starting_block: chain.chain_info().best_block_number, + state, + starting_block: best_block, highest_block: None, peers: HashMap::new(), handshaking_peers: HashMap::new(), @@ -410,7 +417,7 @@ impl ChainSync { snapshot: Snapshot::new(), sync_start_time: None, transactions_stats: TransactionsStats::default(), - enable_warp_sync: config.warp_sync, + warp_sync: config.warp_sync, }; sync.update_targets(chain); sync @@ -508,10 +515,12 @@ impl ChainSync { } fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) { - if !self.enable_warp_sync || io.snapshot_service().supported_versions().is_none() { + if !self.warp_sync.is_enabled() || io.snapshot_service().supported_versions().is_none() { + trace!(target: "sync", "Skipping warp sync. Disabled or not supported."); return; } if self.state != SyncState::WaitingPeers && self.state != SyncState::Blocks && self.state != SyncState::Waiting { + trace!(target: "sync", "Skipping warp sync. State: {:?}", self.state); return; } // Make sure the snapshot block is not too far away from best block and network best block and @@ -520,11 +529,16 @@ impl ChainSync { let fork_block = self.fork_block.as_ref().map(|&(n, _)| n).unwrap_or(0); let (best_hash, max_peers, snapshot_peers) = { + let expected_warp_block = match self.warp_sync { + WarpSync::OnlyAndAfter(block) => block, + _ => 0, + }; //collect snapshot infos from peers let snapshots = self.peers.iter() .filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn| our_best_block < sn && (sn - our_best_block) > SNAPSHOT_RESTORE_THRESHOLD && sn > fork_block && + sn > expected_warp_block && self.highest_block.map_or(true, |highest| highest >= sn && (highest - sn) <= SNAPSHOT_RESTORE_THRESHOLD) )) .filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone()))) @@ -554,7 +568,7 @@ impl ChainSync { trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers); self.start_snapshot_sync(io, peers); } - } else if timeout { + } else if timeout && !self.warp_sync.is_warp_only() { trace!(target: "sync", "No snapshots found, starting full sync"); self.state = SyncState::Idle; self.continue_sync(io); @@ -626,10 +640,6 @@ impl ChainSync { block_set: None, }; - if self.sync_start_time.is_none() { - self.sync_start_time = Some(Instant::now()); - } - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number); if io.is_expired() { @@ -658,6 +668,10 @@ impl ChainSync { return Ok(()); } + if self.sync_start_time.is_none() { + self.sync_start_time = Some(Instant::now()); + } + self.peers.insert(peer_id.clone(), peer); // Don't activate peer immediatelly when searching for common block. // Let the current sync round complete first. @@ -1167,9 +1181,14 @@ impl ChainSync { self.sync_peer(io, p, false); } } - if (self.state != SyncState::WaitingPeers && self.state != SyncState::SnapshotWaiting && self.state != SyncState::Waiting && self.state != SyncState::Idle) - && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) { + if + self.state != SyncState::WaitingPeers && + self.state != SyncState::SnapshotWaiting && + self.state != SyncState::Waiting && + self.state != SyncState::Idle && + !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) + { self.complete_sync(io); } } @@ -1220,7 +1239,13 @@ impl ChainSync { if force || higher_difficulty || self.old_blocks.is_some() { match self.state { SyncState::WaitingPeers => { - trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); + trace!( + target: "sync", + "Checking snapshot sync: {} vs {} (peer: {})", + peer_snapshot_number, + chain_info.best_block_number, + peer_id + ); self.maybe_start_snapshot_sync(io); }, SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => { diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 517ff0d99..6b5ef65da 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockId, EachBlockWith, ChainInfo, BlockInfo}; use chain::{SyncState}; use super::helpers::*; -use SyncConfig; +use {SyncConfig, WarpSync}; #[test] fn two_peers() { @@ -161,7 +161,7 @@ fn status_empty() { let net = TestNet::new(2); assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle); let mut config = SyncConfig::default(); - config.warp_sync = true; + config.warp_sync = WarpSync::Enabled; let net = TestNet::new_with_config(2, config); assert_eq!(net.peer(0).sync.read().status().state, SyncState::WaitingPeers); } diff --git a/sync/src/tests/snapshot.rs b/sync/src/tests/snapshot.rs index 516e3d7e2..2f6441f4f 100644 --- a/sync/src/tests/snapshot.rs +++ b/sync/src/tests/snapshot.rs @@ -24,7 +24,7 @@ use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus}; use ethcore::header::BlockNumber; use ethcore::client::{EachBlockWith}; use super::helpers::*; -use SyncConfig; +use {SyncConfig, WarpSync}; pub struct TestSnapshotService { manifest: Option, @@ -127,7 +127,7 @@ impl SnapshotService for TestSnapshotService { fn snapshot_sync() { ::env_logger::init().ok(); let mut config = SyncConfig::default(); - config.warp_sync = true; + config.warp_sync = WarpSync::Enabled; let mut net = TestNet::new_with_config(5, config); let snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000)); for i in 0..4 { diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 9df2d0b38..51990b6f5 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -594,11 +594,11 @@ impl Host { }; match TcpStream::connect(&address) { Ok(socket) => { - trace!(target: "network", "Connecting to {:?}", address); + trace!(target: "network", "{}: Connecting to {:?}", id, address); socket }, Err(e) => { - debug!(target: "network", "Can't connect to address {:?}: {:?}", address, e); + debug!(target: "network", "{}: Can't connect to address {:?}: {:?}", id, address, e); return; } } @@ -614,6 +614,7 @@ impl Host { let mut sessions = self.sessions.write(); let token = sessions.insert_with_opt(|token| { + trace!(target: "network", "{}: Initiating session {:?}", token, id); match Session::new(io, socket, token, id, &nonce, &self.info.read()) { Ok(s) => Some(Arc::new(Mutex::new(s))), Err(e) => {