Sync optimization (#1385)

* Minor sync fixes

* Fixed session count sub

* handle NewBlock when downloading

* Accept new blocks right away

* block collection update fixed
This commit is contained in:
Arkadiy Paronyan 2016-06-22 12:10:26 +02:00 committed by Gav Wood
parent e2de1987c7
commit 11314a660d
3 changed files with 72 additions and 50 deletions

View File

@ -295,6 +295,10 @@ impl BlockCollection {
let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() }; let old_subchains: HashSet<_> = { self.heads.iter().cloned().collect() };
for s in self.heads.drain(..) { for s in self.heads.drain(..) {
let mut h = s.clone(); let mut h = s.clone();
if !self.blocks.contains_key(&h) {
new_heads.push(h);
continue;
}
loop { loop {
match self.parents.get(&h) { match self.parents.get(&h) {
Some(next) => { Some(next) => {
@ -394,7 +398,7 @@ mod test {
assert_eq!(&bc.drain()[..], &blocks[6..16]); assert_eq!(&bc.drain()[..], &blocks[6..16]);
assert_eq!(hashes[15], bc.heads[0]); assert_eq!(hashes[15], bc.heads[0]);
bc.insert_headers(headers[16..].to_vec()); bc.insert_headers(headers[15..].to_vec());
bc.drain(); bc.drain();
assert!(bc.is_empty()); assert!(bc.is_empty());
} }
@ -420,5 +424,24 @@ mod test {
assert!(bc.head.is_some()); assert!(bc.head.is_some());
assert_eq!(hashes[21], bc.heads[0]); assert_eq!(hashes[21], bc.heads[0]);
} }
#[test]
fn insert_headers_no_gap() {
let mut bc = BlockCollection::new();
assert!(is_empty(&bc));
let client = TestBlockChainClient::new();
let nblocks = 200;
client.add_blocks(nblocks, EachBlockWith::Nothing);
let blocks: Vec<_> = (0 .. nblocks).map(|i| (&client as &BlockChainClient).block(BlockID::Number(i as BlockNumber)).unwrap()).collect();
let headers: Vec<_> = blocks.iter().map(|b| Rlp::new(b).at(0).as_raw().to_vec()).collect();
let hashes: Vec<_> = headers.iter().map(|h| HeaderView::new(h).sha3()).collect();
let heads: Vec<_> = hashes.iter().enumerate().filter_map(|(i, h)| if i % 20 == 0 { Some(h.clone()) } else { None }).collect();
bc.reset_to(heads);
bc.insert_headers(headers[1..2].to_vec());
assert!(bc.drain().is_empty());
bc.insert_headers(headers[0..1].to_vec());
assert_eq!(bc.drain().len(), 2);
}
} }

View File

@ -100,6 +100,7 @@ use io::SyncIo;
use time; use time;
use super::SyncConfig; use super::SyncConfig;
use blocks::BlockCollection; use blocks::BlockCollection;
use rand::{thread_rng, Rng};
known_heap_size!(0, PeerInfo); known_heap_size!(0, PeerInfo);
@ -308,7 +309,6 @@ impl ChainSync {
} }
self.syncing_difficulty = From::from(0u64); self.syncing_difficulty = From::from(0u64);
self.state = SyncState::Idle; self.state = SyncState::Idle;
self.blocks.clear();
self.active_peers = self.peers.keys().cloned().collect(); self.active_peers = self.peers.keys().cloned().collect();
} }
@ -393,7 +393,7 @@ impl ChainSync {
self.clear_peer_download(peer_id); self.clear_peer_download(peer_id);
let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash); let expected_hash = self.peers.get(&peer_id).and_then(|p| p.asking_hash);
let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders }; let expected_asking = if self.state == SyncState::ChainHead { PeerAsking::Heads } else { PeerAsking::BlockHeaders };
if !self.reset_peer_asking(peer_id, expected_asking) { if !self.reset_peer_asking(peer_id, expected_asking) || expected_hash.is_none() {
trace!(target: "sync", "Ignored unexpected headers"); trace!(target: "sync", "Ignored unexpected headers");
self.continue_sync(io); self.continue_sync(io);
return Ok(()); return Ok(());
@ -533,10 +533,6 @@ impl ChainSync {
let header_rlp = try!(block_rlp.at(0)); let header_rlp = try!(block_rlp.at(0));
let h = header_rlp.as_raw().sha3(); let h = header_rlp.as_raw().sha3();
trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
return Ok(());
}
let header: BlockHeader = try!(header_rlp.as_val()); let header: BlockHeader = try!(header_rlp.as_val());
let mut unknown = false; let mut unknown = false;
{ {
@ -544,7 +540,6 @@ impl ChainSync {
peer.latest_hash = header.hash(); peer.latest_hash = header.hash();
peer.latest_number = Some(header.number()); peer.latest_number = Some(header.number());
} }
if header.number <= self.last_imported_block + 1 {
match io.chain().import_block(block_rlp.as_raw().to_vec()) { match io.chain().import_block(block_rlp.as_raw().to_vec()) {
Err(Error::Import(ImportError::AlreadyInChain)) => { Err(Error::Import(ImportError::AlreadyInChain)) => {
trace!(target: "sync", "New block already in chain {:?}", h); trace!(target: "sync", "New block already in chain {:?}", h);
@ -568,11 +563,10 @@ impl ChainSync {
io.disable_peer(peer_id); io.disable_peer(peer_id);
} }
}; };
}
else {
unknown = true;
}
if unknown { if unknown {
if self.state != SyncState::Idle {
trace!(target: "sync", "NewBlock ignored while seeking");
} else {
trace!(target: "sync", "New unknown block {:?}", h); trace!(target: "sync", "New unknown block {:?}", h);
//TODO: handle too many unknown blocks //TODO: handle too many unknown blocks
let difficulty: U256 = try!(r.val_at(1)); let difficulty: U256 = try!(r.val_at(1));
@ -585,6 +579,7 @@ impl ChainSync {
} }
self.sync_peer(io, peer_id, true); self.sync_peer(io, peer_id, true);
} }
}
Ok(()) Ok(())
} }
@ -661,7 +656,7 @@ impl ChainSync {
/// Resume downloading /// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) { fn continue_sync(&mut self, io: &mut SyncIo) {
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect(); let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating thread_rng().shuffle(&mut peers); //TODO: sort by rating
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
for (p, _) in peers { for (p, _) in peers {
if self.active_peers.contains(&p) { if self.active_peers.contains(&p) {
@ -688,6 +683,10 @@ impl ChainSync {
/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
if !self.active_peers.contains(&peer_id) {
trace!(target: "sync", "Skipping deactivated peer");
return;
}
let (peer_latest, peer_difficulty) = { let (peer_latest, peer_difficulty) = {
let peer = self.peers.get_mut(&peer_id).unwrap(); let peer = self.peers.get_mut(&peer_id).unwrap();
if peer.asking != PeerAsking::Nothing { if peer.asking != PeerAsking::Nothing {

View File

@ -191,7 +191,7 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
sessions: Arc<RwLock<Slab<SharedSession>>>, sessions: Arc<RwLock<Slab<SharedSession>>>,
session: Option<SharedSession>, session: Option<SharedSession>,
session_id: Option<StreamToken>, session_id: Option<StreamToken>,
reserved_peers: &'s HashSet<NodeId>, _reserved_peers: &'s HashSet<NodeId>,
} }
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
@ -207,7 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
session_id: id, session_id: id,
session: session, session: session,
sessions: sessions, sessions: sessions,
reserved_peers: reserved_peers, _reserved_peers: reserved_peers,
} }
} }
@ -837,9 +837,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut s = session.lock().unwrap(); let mut s = session.lock().unwrap();
if !s.expired() { if !s.expired() {
if s.is_ready() { if s.is_ready() {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().unwrap().iter() { for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) { if s.have_capability(p) {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
to_disconnect.push(p); to_disconnect.push(p);
} }
} }