Merge pull request #685 from ethcore/sync

More sync fixes
This commit is contained in:
Gav Wood 2016-03-13 12:14:24 +01:00
commit 94932386e3
2 changed files with 33 additions and 30 deletions

View File

@ -298,8 +298,6 @@ impl ChainSync {
/// Restart sync /// Restart sync
pub fn restart(&mut self, io: &mut SyncIo) { pub fn restart(&mut self, io: &mut SyncIo) {
self.reset(); self.reset();
self.last_imported_block = None;
self.last_imported_hash = None;
self.starting_block = 0; self.starting_block = 0;
self.highest_block = None; self.highest_block = None;
self.have_common_block = false; self.have_common_block = false;
@ -366,7 +364,7 @@ impl ChainSync {
for i in 0..item_count { for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i)); let info: BlockHeader = try!(r.val_at(i));
let number = BlockNumber::from(info.number); let number = BlockNumber::from(info.number);
if number <= self.current_base_block() || self.headers.have_item(&number) { if (number <= self.current_base_block() && self.have_common_block) || self.headers.have_item(&number) {
trace!(target: "sync", "Skipping existing block header"); trace!(target: "sync", "Skipping existing block header");
continue; continue;
} }
@ -376,11 +374,17 @@ impl ChainSync {
} }
let hash = info.hash(); let hash = info.hash();
match io.chain().block_status(BlockId::Hash(hash.clone())) { match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain => { BlockStatus::InChain | BlockStatus::Queued => {
self.have_common_block = true; if !self.have_common_block || self.current_base_block() < number {
self.last_imported_block = Some(number); self.last_imported_block = Some(number);
self.last_imported_hash = Some(hash.clone()); self.last_imported_hash = Some(hash.clone());
}
if !self.have_common_block {
self.have_common_block = true;
trace!(target: "sync", "Found common header {} ({})", number, hash); trace!(target: "sync", "Found common header {} ({})", number, hash);
} else {
trace!(target: "sync", "Header already in chain {} ({})", number, hash);
}
}, },
_ => { _ => {
if self.have_common_block { if self.have_common_block {
@ -588,7 +592,7 @@ impl ChainSync {
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}", peer); trace!(target: "sync", "== Connected {}", peer);
if let Err(e) = self.send_status(io) { if let Err(e) = self.send_status(io) {
warn!(target:"sync", "Error sending status request: {:?}", e); debug!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer); io.disable_peer(peer);
} }
} }
@ -656,10 +660,7 @@ impl ChainSync {
let mut needed_numbers: Vec<BlockNumber> = Vec::new(); let mut needed_numbers: Vec<BlockNumber> = Vec::new();
if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 {
for (start, ref items) in self.headers.range_iter() { if let Some((start, ref items)) = self.headers.range_iter().next() {
if needed_bodies.len() >= MAX_BODIES_TO_REQUEST {
break;
}
let mut index: BlockNumber = 0; let mut index: BlockNumber = 0;
while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST { while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST {
let block = start + index; let block = start + index;
@ -703,7 +704,10 @@ impl ChainSync {
if !self.have_common_block { if !self.have_common_block {
// download backwards until common block is found 1 header at a time // download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info(); let chain_info = io.chain().chain_info();
start = chain_info.best_block_number; start = match self.last_imported_block {
Some(n) => n,
None => chain_info.best_block_number,
};
if !self.headers.is_empty() { if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 - 1); start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
} }
@ -844,18 +848,12 @@ impl ChainSync {
/// Remove downloaded bocks/headers starting from specified number. /// Remove downloaded bocks/headers starting from specified number.
/// Used to recover from an error and re-download parts of the chain detected as bad. /// Used to recover from an error and re-download parts of the chain detected as bad.
fn remove_downloaded_blocks(&mut self, start: BlockNumber) { fn remove_downloaded_blocks(&mut self, start: BlockNumber) {
for n in self.headers.get_tail(&start) { let ids = self.header_ids.drain().filter(|&(_, v)| v < start).collect();
if let Some(ref header_data) = self.headers.find_item(&n) { self.header_ids = ids;
let header_to_delete = HeaderView::new(&header_data.data); let hdrs = self.downloading_headers.drain().filter(|v| *v < start).collect();
let header_id = HeaderId { self.downloading_headers = hdrs;
transactions_root: header_to_delete.transactions_root(), let bodies = self.downloading_bodies.drain().filter(|v| *v < start).collect();
uncles: header_to_delete.uncles_hash() self.downloading_bodies = bodies;
};
self.header_ids.remove(&header_id);
}
self.downloading_bodies.remove(&n);
self.downloading_headers.remove(&n);
}
self.headers.remove_from(&start); self.headers.remove_from(&start);
self.bodies.remove_from(&start); self.bodies.remove_from(&start);
} }
@ -1095,7 +1093,7 @@ impl ChainSync {
let rlp = UntrustedRlp::new(data); let rlp = UntrustedRlp::new(data);
if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) { if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer)); debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return; return;
} }
let result = match packet_id { let result = match packet_id {

View File

@ -300,12 +300,17 @@ fn test_range() {
let mut r = ranges.clone(); let mut r = ranges.clone();
r.remove_from(&20); r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&17); r.remove_from(&18);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); assert!(!r.have_item(&18));
r.remove_from(&15); assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q'][..])]), Ordering::Equal);
r.remove_from(&16);
assert!(!r.have_item(&16));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3); r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&1);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&2); r.remove_from(&2);
assert_eq!(r.range_iter().next(), None); assert_eq!(r.range_iter().next(), None);
} }