From 11314a660d2c584f7b6d7a718f09120fbf8f0b7a Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Wed, 22 Jun 2016 12:10:26 +0200 Subject: [PATCH] Sync optimization (#1385) * Minor sync fixes * Fixed session count sub * handle NewBlock when downloading * Accept new blocks right away * block collection update fixed --- sync/src/blocks.rs | 25 ++++++++++- sync/src/chain.rs | 91 ++++++++++++++++++++-------------------- util/src/network/host.rs | 6 +-- 3 files changed, 72 insertions(+), 50 deletions(-) diff --git a/sync/src/blocks.rs b/sync/src/blocks.rs index acc6703d5..b48085d43 100644 --- a/sync/src/blocks.rs +++ b/sync/src/blocks.rs @@ -295,6 +295,10 @@ impl BlockCollection { let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() }; for s in self.heads.drain(..) { let mut h = s.clone(); + if !self.blocks.contains_key(&h) { + new_heads.push(h); + continue; + } loop { match self.parents.get(&h) { Some(next) => { @@ -394,7 +398,7 @@ mod test { assert_eq!(&bc.drain()[..], &blocks[6..16]); assert_eq!(hashes[15], bc.heads[0]); - bc.insert_headers(headers[16..].to_vec()); + bc.insert_headers(headers[15..].to_vec()); bc.drain(); assert!(bc.is_empty()); } @@ -420,5 +424,24 @@ mod test { assert!(bc.head.is_some()); assert_eq!(hashes[21], bc.heads[0]); } + + #[test] + fn insert_headers_no_gap() { + let mut bc = BlockCollection::new(); + assert!(is_empty(&bc)); + let client = TestBlockChainClient::new(); + let nblocks = 200; + client.add_blocks(nblocks, EachBlockWith::Nothing); + let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect(); + let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect(); + let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect(); + let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect(); + bc.reset_to(heads); + + bc.insert_headers(headers[1..2].to_vec()); + assert!(bc.drain().is_empty()); + bc.insert_headers(headers[0..1].to_vec()); + assert_eq!(bc.drain().len(), 2); + } } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 01640ec4d..55e4e93b2 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -100,6 +100,7 @@ use io::SyncIo; use time; use super::SyncConfig; use blocks::BlockCollection; +use rand::{thread_rng, Rng}; known_heap_size!(0, PeerInfo); @@ -308,7 +309,6 @@ impl ChainSync { } self.syncing_difficulty = From::from(0u64); self.state = SyncState::Idle; - self.blocks.clear(); self.active_peers = self.peers.keys().cloned().collect(); } @@ -393,7 +393,7 @@ impl ChainSync { self.clear_peer_download(peer_id); let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; - if !self.reset_peer_asking(peer_id, expected_asking) { + if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() { trace!(target: "sync", "Ignored unexpected headers"); self.continue_sync(io); return Ok(()); @@ -533,10 +533,6 @@ impl ChainSync { let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - if self.state != SyncState::Idle { - trace!(target: "sync", "NewBlock ignored while seeking"); - return Ok(()); - } let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; { @@ -544,46 +540,45 @@ impl ChainSync { peer.latest_hash = header.hash(); peer.latest_number = Some(header.number()); } - if header.number <= self.last_imported_block + 1 { - match io.chain().import_block(block_rlp.as_raw().to_vec()) { - Err(Error::Import(ImportError::AlreadyInChain)) => { - trace!(target: "sync", "New block already in chain {:?}", h); - }, - Err(Error::Import(ImportError::AlreadyQueued)) => { - trace!(target: "sync", "New block already queued {:?}", h); - }, - Ok(_) => { - if header.number == self.last_imported_block + 1 { - self.last_imported_block = header.number; - self.last_imported_hash = header.hash(); - } - trace!(target: "sync", "New block queued {:?} ({})", h, header.number); - }, - Err(Error::Block(BlockError::UnknownParent(p))) => { - unknown = true; - trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); - }, - Err(e) => { - debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); - io.disable_peer(peer_id); - } - }; - } - else { - unknown = true; - } - if unknown { - trace!(target: "sync", "New unknown block {:?}", h); - //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - if peer.difficulty.map_or(true, |pd| difficulty > pd) { - //self.state = SyncState::ChainHead; - peer.difficulty = Some(difficulty); - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + match io.chain().import_block(block_rlp.as_raw().to_vec()) { + Err(Error::Import(ImportError::AlreadyInChain)) => { + trace!(target: "sync", "New block already in chain {:?}", h); + }, + Err(Error::Import(ImportError::AlreadyQueued)) => { + trace!(target: "sync", "New block already queued {:?}", h); + }, + Ok(_) => { + if header.number == self.last_imported_block + 1 { + self.last_imported_block = header.number; + self.last_imported_hash = header.hash(); } + trace!(target: "sync", "New block queued {:?} ({})", h, header.number); + }, + Err(Error::Block(BlockError::UnknownParent(p))) => { + unknown = true; + trace!(target: "sync", "New block with unknown parent ({:?}) {:?}", p, h); + }, + Err(e) => { + debug!(target: "sync", "Bad new block {:?} : {:?}", h, e); + io.disable_peer(peer_id); + } + }; + if unknown { + if self.state != SyncState::Idle { + trace!(target: "sync", "NewBlock ignored while seeking"); + } else { + trace!(target: "sync", "New unknown block {:?}", h); + //TODO: handle too many unknown blocks + let difficulty: U256 = try!(r.val_at(1)); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.difficulty.map_or(true, |pd| difficulty > pd) { + //self.state = SyncState::ChainHead; + peer.difficulty = Some(difficulty); + trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); + } + } + self.sync_peer(io, peer_id, true); } - self.sync_peer(io, peer_id, true); } Ok(()) } @@ -661,7 +656,7 @@ impl ChainSync { /// Resume downloading fn continue_sync(&mut self, io: &mut SyncIo) { let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); - peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating + thread_rng().shuffle(&mut peers); //TODO: sort by rating trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); for (p, _) in peers { if self.active_peers.contains(&p) { @@ -687,7 +682,11 @@ impl ChainSync { } /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. - fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { + if !self.active_peers.contains(&peer_id) { + trace!(target: "sync", "Skipping deactivated peer"); + return; + } let (peer_latest, peer_difficulty) = { let peer = self.peers.get_mut(&peer_id).unwrap(); if peer.asking != PeerAsking::Nothing { diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 03ba07544..7de581b4d 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -191,7 +191,7 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta sessions: Arc>>, session: Option, session_id: Option, - reserved_peers: &'s HashSet, + _reserved_peers: &'s HashSet, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { @@ -207,7 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone session_id: id, session: session, sessions: sessions, - reserved_peers: reserved_peers, + _reserved_peers: reserved_peers, } } @@ -837,9 +837,9 @@ impl Host where Message: Send + Sync + Clone { let mut s = session.lock().unwrap(); if !s.expired() { if s.is_ready() { + self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); for (p, _) in self.handlers.read().unwrap().iter() { if s.have_capability(p) { - self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst); to_disconnect.push(p); } }