diff --git a/sync/src/api.rs b/sync/src/api.rs index 67a81237a..abe18c3cd 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -58,7 +58,7 @@ impl Default for SyncConfig { network_id: U256::from(1), subprotocol_name: *b"eth", fork_block: None, - warp_sync: true, + warp_sync: false, } } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index d18adb6ea..06949682b 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -123,6 +123,7 @@ const MAX_NEW_BLOCK_AGE: BlockNumber = 20; const MAX_TRANSACTION_SIZE: usize = 300*1024; // Min number of blocks to be behind for a snapshot sync const SNAPSHOT_RESTORE_THRESHOLD: BlockNumber = 100000; +const SNAPSHOT_MIN_PEERS: usize = 3; const STATUS_PACKET: u8 = 0x00; const NEW_BLOCK_HASHES_PACKET: u8 = 0x01; @@ -147,21 +148,27 @@ const SNAPSHOT_DATA_PACKET: u8 = 0x14; pub const SNAPSHOT_SYNC_PACKET_COUNT: u8 = 0x15; -const HEADERS_TIMEOUT_SEC: f64 = 15f64; -const BODIES_TIMEOUT_SEC: f64 = 10f64; -const RECEIPTS_TIMEOUT_SEC: f64 = 10f64; -const FORK_HEADER_TIMEOUT_SEC: f64 = 3f64; -const SNAPSHOT_MANIFEST_TIMEOUT_SEC: f64 = 3f64; -const SNAPSHOT_DATA_TIMEOUT_SEC: f64 = 60f64; +const MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD: usize = 3; + +const WAIT_PEERS_TIMEOUT_SEC: u64 = 5; +const STATUS_TIMEOUT_SEC: u64 = 5; +const HEADERS_TIMEOUT_SEC: u64 = 15; +const BODIES_TIMEOUT_SEC: u64 = 10; +const RECEIPTS_TIMEOUT_SEC: u64 = 10; +const FORK_HEADER_TIMEOUT_SEC: u64 = 3; +const SNAPSHOT_MANIFEST_TIMEOUT_SEC: u64 = 3; +const SNAPSHOT_DATA_TIMEOUT_SEC: u64 = 60; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Sync state pub enum SyncState { - /// Waiting for pv64 peers to start snapshot syncing + /// Collecting enough peers to start syncing. + WaitingPeers, + /// Waiting for snapshot manifest download SnapshotManifest, /// Downloading snapshot data SnapshotData, - /// Waiting for snapshot restoration to complete + /// Waiting for snapshot restoration progress. SnapshotWaiting, /// Downloading new blocks Blocks, @@ -276,7 +283,7 @@ struct PeerInfo { /// Holds requested snapshot chunk hash if any. asking_snapshot_data: Option, /// Request timestamp - ask_time: f64, + ask_time: u64, /// Holds a set of transactions recently sent to this peer to avoid spamming. last_sent_transactions: HashSet, /// Pending request is expired and result should be ignored @@ -324,10 +331,13 @@ pub struct ChainSync { network_id: U256, /// Optional fork block to check fork_block: Option<(BlockNumber, H256)>, - /// Snapshot sync allowed. - snapshot_sync_enabled: bool, /// Snapshot downloader. snapshot: Snapshot, + /// Connected peers pending Status message. + /// Value is request timestamp. + handshaking_peers: HashMap, + /// Sync start timestamp. Measured when first peer is connected + sync_start_time: Option, } type RlpResponseResult = Result, PacketDecodeError>; @@ -337,20 +347,21 @@ impl ChainSync { pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { let chain_info = chain.chain_info(); let mut sync = ChainSync { - state: SyncState::Idle, + state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, starting_block: chain.chain_info().best_block_number, highest_block: None, peers: HashMap::new(), + handshaking_peers: HashMap::new(), active_peers: HashSet::new(), new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, fork_block: config.fork_block, - snapshot_sync_enabled: config.warp_sync, snapshot: Snapshot::new(), + sync_start_time: None, }; - sync.init_downloaders(chain); + sync.update_targets(chain); sync } @@ -442,20 +453,67 @@ impl ChainSync { self.active_peers.remove(&peer_id); } - fn start_snapshot_sync(&mut self, io: &mut SyncIo, peer_id: PeerId) { + fn maybe_start_snapshot_sync(&mut self, io: &mut SyncIo) { + if self.state != SyncState::WaitingPeers { + return; + } + let best_block = io.chain().chain_info().best_block_number; + + let (best_hash, max_peers, snapshot_peers) = { + //collect snapshot infos from peers + let snapshots = self.peers.iter() + .filter(|&(_, p)| p.is_allowed() && p.snapshot_number.map_or(false, |sn| best_block < sn && (sn - best_block) > SNAPSHOT_RESTORE_THRESHOLD)) + .filter_map(|(p, peer)| peer.snapshot_hash.map(|hash| (p, hash.clone()))); + + let mut snapshot_peers = HashMap::new(); + let mut max_peers: usize = 0; + let mut best_hash = None; + for (p, hash) in snapshots { + let peers = snapshot_peers.entry(hash).or_insert_with(Vec::new); + peers.push(*p); + if peers.len() > max_peers { + max_peers = peers.len(); + best_hash = Some(hash); + } + } + (best_hash, max_peers, snapshot_peers) + }; + + let timeout = self.sync_start_time.map_or(false, |t| ((time::precise_time_ns() - t) / 1_000_000_000) > WAIT_PEERS_TIMEOUT_SEC); + + if let (Some(hash), Some(peers)) = (best_hash, best_hash.map_or(None, |h| snapshot_peers.get(&h))) { + if max_peers >= SNAPSHOT_MIN_PEERS { + trace!(target: "sync", "Starting confirmed snapshot sync {:?} with {:?}", hash, peers); + self.start_snapshot_sync(io, peers); + } else if timeout { + trace!(target: "sync", "Starting unconfirmed snapshot sync {:?} with {:?}", hash, peers); + self.start_snapshot_sync(io, peers); + } + } else if timeout { + trace!(target: "sync", "No snapshots found, starting full sync"); + self.state = SyncState::Idle; + self.continue_sync(io); + } + } + + fn start_snapshot_sync(&mut self, io: &mut SyncIo, peers: &[PeerId]) { self.snapshot.clear(); - self.request_snapshot_manifest(io, peer_id); + for p in peers { + if self.peers.get(p).map_or(false, |p| p.asking == PeerAsking::Nothing) { + self.request_snapshot_manifest(io, *p); + } + } self.state = SyncState::SnapshotManifest; } /// Restart sync disregarding the block queue status. May end up re-downloading up to QUEUE_SIZE blocks pub fn restart(&mut self, io: &mut SyncIo) { - self.init_downloaders(io.chain()); + self.update_targets(io.chain()); self.reset_and_continue(io); } - /// Restart sync after bad block has been detected. May end up re-downloading up to QUEUE_SIZE blocks - fn init_downloaders(&mut self, chain: &BlockChainClient) { + /// Update sync after the blockchain has been changed externally. + pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block let chain = chain.chain_info(); self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); @@ -475,6 +533,7 @@ impl ChainSync { /// Called by peer to report status fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.handshaking_peers.remove(&peer_id); let protocol_version: u8 = try!(r.val_at(0)); let warp_protocol = io.protocol_version(&WARP_SYNC_PROTOCOL_ID, peer_id) != 0; let peer = PeerInfo { @@ -486,7 +545,7 @@ impl ChainSync { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, - ask_time: 0f64, + ask_time: 0, last_sent_transactions: HashSet::new(), expired: false, confirmation: if self.fork_block.is_none() { ForkConfirmation::Confirmed } else { ForkConfirmation::Unconfirmed }, @@ -496,7 +555,12 @@ impl ChainSync { block_set: None, }; - trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis); + if self.sync_start_time.is_none() { + self.sync_start_time = Some(time::precise_time_ns()); + } + + trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{}, snapshot:{:?})", + peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis, peer.snapshot_number); if io.is_expired() { trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); return Ok(()); @@ -578,7 +642,7 @@ impl ChainSync { } let item_count = r.item_count(); trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}, set = {:?}", peer_id, item_count, self.state, block_set); - if self.state == SyncState::Idle && self.old_blocks.is_none() { + if (self.state == SyncState::Idle || self.state == SyncState::WaitingPeers) && self.old_blocks.is_none() { trace!(target: "sync", "Ignored unexpected block headers"); self.continue_sync(io); return Ok(()); @@ -875,7 +939,7 @@ impl ChainSync { } self.clear_peer_download(peer_id); if !self.reset_peer_asking(peer_id, PeerAsking::SnapshotManifest) || self.state != SyncState::SnapshotManifest { - trace!(target: "sync", "{}: Ignored unexpected manifest", peer_id); + trace!(target: "sync", "{}: Ignored unexpected/expired manifest", peer_id); self.continue_sync(io); return Ok(()); } @@ -918,7 +982,7 @@ impl ChainSync { match io.snapshot_service().status() { RestorationStatus::Inactive | RestorationStatus::Failed => { trace!(target: "sync", "{}: Snapshot restoration aborted", peer_id); - self.state = SyncState::Idle; + self.state = SyncState::WaitingPeers; self.snapshot.clear(); self.continue_sync(io); return Ok(()); @@ -960,6 +1024,7 @@ impl ChainSync { /// Called by peer when it is disconnecting pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { trace!(target: "sync", "== Disconnecting {}: {}", peer, io.peer_info(peer)); + self.handshaking_peers.remove(&peer); if self.peers.contains_key(&peer) { debug!(target: "sync", "Disconnected {}", peer); self.clear_peer_download(peer); @@ -975,12 +1040,14 @@ impl ChainSync { if let Err(e) = self.send_status(io, peer) { debug!(target:"sync", "Error sending status request: {:?}", e); io.disable_peer(peer); + } else { + self.handshaking_peers.insert(peer, time::precise_time_ns()); } } /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { - if self.state != SyncState::Waiting && self.state != SyncState::SnapshotWaiting + if (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks || self.state == SyncState::Idle) && !self.peers.values().any(|p| p.asking != PeerAsking::Nothing && p.block_set != Some(BlockSet::OldBlocks) && p.can_sync()) { self.complete_sync(io); } @@ -1040,11 +1107,9 @@ impl ChainSync { let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { match self.state { - SyncState::Idle if self.snapshot_sync_enabled - && chain_info.best_block_number < peer_snapshot_number - && (peer_snapshot_number - chain_info.best_block_number) > SNAPSHOT_RESTORE_THRESHOLD => { - trace!(target: "sync", "Starting snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); - self.start_snapshot_sync(io, peer_id); + SyncState::WaitingPeers => { + trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); + self.maybe_start_snapshot_sync(io); }, SyncState::Idle | SyncState::Blocks | SyncState::NewBlocks => { if io.chain().queue_info().is_full() { @@ -1070,6 +1135,13 @@ impl ChainSync { } }, SyncState::SnapshotData => { + if let RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } = io.snapshot_service().status() { + if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { + trace!(target: "sync", "Snapshot queue full, pausing sync"); + self.state = SyncState::SnapshotWaiting; + return; + } + } if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() { self.request_snapshot_data(io, peer_id); } @@ -1253,7 +1325,7 @@ impl ChainSync { warn!(target:"sync", "Asking {:?} while requesting {:?}", peer.asking, asking); } peer.asking = asking; - peer.ask_time = time::precise_time_s(); + peer.ask_time = time::precise_time_ns(); let result = if packet_id >= ETH_PACKET_COUNT { sync.send_protocol(WARP_SYNC_PROTOCOL_ID, peer_id, packet_id, packet) } else { @@ -1590,17 +1662,18 @@ impl ChainSync { #[cfg_attr(feature="dev", allow(match_same_arms))] pub fn maintain_peers(&mut self, io: &mut SyncIo) { - let tick = time::precise_time_s(); + let tick = time::precise_time_ns(); let mut aborting = Vec::new(); for (peer_id, peer) in &self.peers { + let elapsed = (tick - peer.ask_time) / 1_000_000_000; let timeout = match peer.asking { - PeerAsking::BlockHeaders => (tick - peer.ask_time) > HEADERS_TIMEOUT_SEC, - PeerAsking::BlockBodies => (tick - peer.ask_time) > BODIES_TIMEOUT_SEC, - PeerAsking::BlockReceipts => (tick - peer.ask_time) > RECEIPTS_TIMEOUT_SEC, + PeerAsking::BlockHeaders => elapsed > HEADERS_TIMEOUT_SEC, + PeerAsking::BlockBodies => elapsed > BODIES_TIMEOUT_SEC, + PeerAsking::BlockReceipts => elapsed > RECEIPTS_TIMEOUT_SEC, PeerAsking::Nothing => false, - PeerAsking::ForkHeader => (tick - peer.ask_time) > FORK_HEADER_TIMEOUT_SEC, - PeerAsking::SnapshotManifest => (tick - peer.ask_time) > SNAPSHOT_MANIFEST_TIMEOUT_SEC, - PeerAsking::SnapshotData => (tick - peer.ask_time) > SNAPSHOT_DATA_TIMEOUT_SEC, + PeerAsking::ForkHeader => elapsed > FORK_HEADER_TIMEOUT_SEC, + PeerAsking::SnapshotManifest => elapsed > SNAPSHOT_MANIFEST_TIMEOUT_SEC, + PeerAsking::SnapshotData => elapsed > SNAPSHOT_DATA_TIMEOUT_SEC, }; if timeout { trace!(target:"sync", "Timeout {}", peer_id); @@ -1611,16 +1684,42 @@ impl ChainSync { for p in aborting { self.on_peer_aborting(io, p); } + + // Check for handshake timeouts + for (peer, ask_time) in &self.handshaking_peers { + let elapsed = (tick - ask_time) / 1_000_000_000; + if elapsed > STATUS_TIMEOUT_SEC { + trace!(target:"sync", "Status timeout {}", peer); + io.disconnect_peer(*peer); + } + } } fn check_resume(&mut self, io: &mut SyncIo) { if self.state == SyncState::Waiting && !io.chain().queue_info().is_full() && self.state == SyncState::Waiting { self.state = SyncState::Blocks; self.continue_sync(io); - } else if self.state == SyncState::SnapshotWaiting && io.snapshot_service().status() == RestorationStatus::Inactive { - trace!(target:"sync", "Snapshot restoration is complete"); - self.restart(io); - self.continue_sync(io); + } else if self.state == SyncState::SnapshotWaiting { + match io.snapshot_service().status() { + RestorationStatus::Inactive => { + trace!(target:"sync", "Snapshot restoration is complete"); + self.restart(io); + self.continue_sync(io); + }, + RestorationStatus::Ongoing { state_chunks: _, block_chunks: _, state_chunks_done, block_chunks_done, } => { + if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD { + trace!(target:"sync", "Resuming snapshot sync"); + self.state = SyncState::SnapshotData; + self.continue_sync(io); + } + }, + RestorationStatus::Failed => { + trace!(target: "sync", "Snapshot restoration aborted"); + self.state = SyncState::WaitingPeers; + self.snapshot.clear(); + self.continue_sync(io); + }, + } } } @@ -1828,6 +1927,7 @@ impl ChainSync { /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { + self.maybe_start_snapshot_sync(io); self.check_resume(io); } @@ -2050,7 +2150,7 @@ mod tests { asking: PeerAsking::Nothing, asking_blocks: Vec::new(), asking_hash: None, - ask_time: 0f64, + ask_time: 0, last_sent_transactions: HashSet::new(), expired: false, confirmation: super::ForkConfirmation::Confirmed, diff --git a/sync/src/snapshot.rs b/sync/src/snapshot.rs index ca9adf220..9f4262105 100644 --- a/sync/src/snapshot.rs +++ b/sync/src/snapshot.rs @@ -113,7 +113,7 @@ impl Snapshot { } pub fn done_chunks(&self) -> usize { - self.total_chunks() - self.completed_chunks.len() + self.completed_chunks.len() } pub fn is_complete(&self) -> bool { @@ -165,6 +165,7 @@ mod test { let mut snapshot = Snapshot::new(); let (manifest, mhash, state_chunks, block_chunks) = test_manifest(); snapshot.reset_to(&manifest, &mhash); + assert_eq!(snapshot.done_chunks(), 0); assert!(snapshot.validate_chunk(&H256::random().to_vec()).is_err()); let requested: Vec = (0..40).map(|_| snapshot.needed_chunk().unwrap()).collect(); @@ -194,6 +195,8 @@ mod test { } assert!(snapshot.is_complete()); + assert_eq!(snapshot.done_chunks(), 40); + assert_eq!(snapshot.done_chunks(), snapshot.total_chunks()); assert_eq!(snapshot.snapshot_hash(), Some(manifest.into_rlp().sha3())); } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index c54529beb..17c051162 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -18,6 +18,7 @@ use util::*; use ethcore::client::{TestBlockChainClient, BlockChainClient, BlockID, EachBlockWith}; use chain::{SyncState}; use super::helpers::*; +use SyncConfig; #[test] fn two_peers() { @@ -156,6 +157,10 @@ fn restart() { fn status_empty() { let net = TestNet::new(2); assert_eq!(net.peer(0).sync.read().status().state, SyncState::Idle); + let mut config = SyncConfig::default(); + config.warp_sync = true; + let net = TestNet::new_with_config(2, config); + assert_eq!(net.peer(0).sync.read().status().state, SyncState::WaitingPeers); } #[test] diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 202ab4f17..d5e07a936 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -127,20 +127,24 @@ pub struct TestNet { impl TestNet { pub fn new(n: usize) -> TestNet { - Self::new_with_fork(n, None) + Self::new_with_config(n, SyncConfig::default()) } pub fn new_with_fork(n: usize, fork: Option<(BlockNumber, H256)>) -> TestNet { + let mut config = SyncConfig::default(); + config.fork_block = fork; + Self::new_with_config(n, config) + } + + pub fn new_with_config(n: usize, config: SyncConfig) -> TestNet { let mut net = TestNet { peers: Vec::new(), started: false, }; for _ in 0..n { let chain = TestBlockChainClient::new(); - let mut config = SyncConfig::default(); - config.fork_block = fork; let ss = Arc::new(TestSnapshotService::new()); - let sync = ChainSync::new(config, &chain); + let sync = ChainSync::new(config.clone(), &chain); net.peers.push(TestPeer { sync: RwLock::new(sync), snapshot_service: ss, @@ -164,7 +168,7 @@ impl TestNet { for client in 0..self.peers.len() { if peer != client { let mut p = self.peers.get_mut(peer).unwrap(); - p.sync.write().restart(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId))); + p.sync.write().update_targets(&mut p.chain); p.sync.write().on_peer_connected(&mut TestIo::new(&mut p.chain, &p.snapshot_service, &mut p.queue, Some(client as PeerId)), client as PeerId); } } diff --git a/sync/src/tests/snapshot.rs b/sync/src/tests/snapshot.rs index 813513e84..5d0b21b47 100644 --- a/sync/src/tests/snapshot.rs +++ b/sync/src/tests/snapshot.rs @@ -19,6 +19,7 @@ use ethcore::snapshot::{SnapshotService, ManifestData, RestorationStatus}; use ethcore::header::BlockNumber; use ethcore::client::{EachBlockWith}; use super::helpers::*; +use SyncConfig; pub struct TestSnapshotService { manifest: Option, @@ -122,11 +123,16 @@ impl SnapshotService for TestSnapshotService { #[test] fn snapshot_sync() { ::env_logger::init().ok(); - let mut net = TestNet::new(2); - net.peer_mut(0).snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000)); - net.peer_mut(0).chain.add_blocks(1, EachBlockWith::Nothing); - net.sync_steps(19); // status + manifest + chunks - assert_eq!(net.peer(1).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len()); - assert_eq!(net.peer(1).snapshot_service.block_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().block_hashes.len()); + let mut config = SyncConfig::default(); + config.warp_sync = true; + let mut net = TestNet::new_with_config(5, config); + let snapshot_service = Arc::new(TestSnapshotService::new_with_snapshot(16, H256::new(), 500000)); + for i in 0..4 { + net.peer_mut(i).snapshot_service = snapshot_service.clone(); + net.peer_mut(i).chain.add_blocks(1, EachBlockWith::Nothing); + } + net.sync_steps(50); + assert_eq!(net.peer(4).snapshot_service.state_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().state_hashes.len()); + assert_eq!(net.peer(4).snapshot_service.block_restoration_chunks.lock().len(), net.peer(0).snapshot_service.manifest.as_ref().unwrap().block_hashes.len()); }