Syncing fix (#1320)
* Fixed aborting peer for expired session * Don't ban for usesless
This commit is contained in:
parent
5e1e3ce857
commit
ec654feaf8
@ -396,7 +396,7 @@ impl ChainSync {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
let item_count = r.item_count();
|
let item_count = r.item_count();
|
||||||
trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count);
|
trace!(target: "sync", "{} -> BlockHeaders ({} entries), state = {:?}", peer_id, item_count, self.state);
|
||||||
if self.state == SyncState::Idle {
|
if self.state == SyncState::Idle {
|
||||||
trace!(target: "sync", "Ignored unexpected block headers");
|
trace!(target: "sync", "Ignored unexpected block headers");
|
||||||
self.continue_sync(io);
|
self.continue_sync(io);
|
||||||
@ -655,6 +655,7 @@ impl ChainSync {
|
|||||||
fn continue_sync(&mut self, io: &mut SyncIo) {
|
fn continue_sync(&mut self, io: &mut SyncIo) {
|
||||||
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
|
let mut peers: Vec<(PeerId, U256)> = self.peers.iter().map(|(k, p)| (*k, p.difficulty.unwrap_or_else(U256::zero))).collect();
|
||||||
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating
|
peers.sort_by(|&(_, d1), &(_, d2)| d1.cmp(&d2).reverse()); //TODO: sort by rating
|
||||||
|
trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len());
|
||||||
for (p, _) in peers {
|
for (p, _) in peers {
|
||||||
if self.active_peers.contains(&p) {
|
if self.active_peers.contains(&p) {
|
||||||
self.sync_peer(io, p, false);
|
self.sync_peer(io, p, false);
|
||||||
@ -737,6 +738,8 @@ impl ChainSync {
|
|||||||
None => {
|
None => {
|
||||||
// TODO: get hash by number from the block queue
|
// TODO: get hash by number from the block queue
|
||||||
trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
|
trace!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash);
|
||||||
|
// just wait for full sync to complete
|
||||||
|
self.pause_sync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1128,14 +1131,19 @@ impl ChainSync {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn maintain_peers(&self, io: &mut SyncIo) {
|
pub fn maintain_peers(&mut self, io: &mut SyncIo) {
|
||||||
let tick = time::precise_time_s();
|
let tick = time::precise_time_s();
|
||||||
|
let mut aborting = Vec::new();
|
||||||
for (peer_id, peer) in &self.peers {
|
for (peer_id, peer) in &self.peers {
|
||||||
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
if peer.asking != PeerAsking::Nothing && (tick - peer.ask_time) > CONNECTION_TIMEOUT_SEC {
|
||||||
trace!(target:"sync", "Timeout {}", peer_id);
|
trace!(target:"sync", "Timeout {}", peer_id);
|
||||||
io.disconnect_peer(*peer_id);
|
io.disconnect_peer(*peer_id);
|
||||||
|
aborting.push(*peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for p in aborting {
|
||||||
|
self.on_peer_aborting(io, p);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_resume(&mut self, io: &mut SyncIo) {
|
fn check_resume(&mut self, io: &mut SyncIo) {
|
||||||
|
@ -139,6 +139,7 @@ impl<Socket: GenericSocket> GenericConnection<Socket> {
|
|||||||
},
|
},
|
||||||
Ok(Some(size)) if (buf.position() as usize) == send_size => {
|
Ok(Some(size)) if (buf.position() as usize) == send_size => {
|
||||||
self.stats.inc_send(size);
|
self.stats.inc_send(size);
|
||||||
|
trace!(target:"network", "{}: Wrote {} bytes", self.token, send_size);
|
||||||
Ok(WriteStatus::Complete)
|
Ok(WriteStatus::Complete)
|
||||||
},
|
},
|
||||||
Ok(Some(_)) => { panic!("Wrote past buffer");},
|
Ok(Some(_)) => { panic!("Wrote past buffer");},
|
||||||
|
@ -250,7 +250,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
self.io.message(NetworkIoMessage::Disconnect(peer));
|
self.io.message(NetworkIoMessage::Disconnect(peer));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sheck if the session is till active.
|
/// Check if the session is still active.
|
||||||
pub fn is_expired(&self) -> bool {
|
pub fn is_expired(&self) -> bool {
|
||||||
self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false)
|
self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false)
|
||||||
}
|
}
|
||||||
@ -664,7 +664,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
|
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
|
||||||
match e {
|
match e {
|
||||||
UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) |
|
|
||||||
UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => {
|
UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => {
|
||||||
if let Some(id) = s.id() {
|
if let Some(id) = s.id() {
|
||||||
self.nodes.write().unwrap().mark_as_useless(id);
|
self.nodes.write().unwrap().mark_as_useless(id);
|
||||||
|
Loading…
Reference in New Issue
Block a user