Fixed loosing peers on incoming connections. (#1293)
* Deactivate peer if it has no new data * Fixed node table timer registration * Fixed handshake timeout expiration * Extra trace * Fixed session count calculation * Only deactivate incapable peers in ChainHead state * Timer registration is not needed
This commit is contained in:
parent
549647b6f2
commit
7284df9bf5
@ -405,6 +405,7 @@ impl ChainSync {
|
||||
}
|
||||
if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) {
|
||||
self.deactivate_peer(io, peer_id); //TODO: is this too harsh?
|
||||
self.continue_sync(io);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -452,7 +453,12 @@ impl ChainSync {
|
||||
|
||||
// Disable the peer for this syncing round if it gives invalid chain
|
||||
if !valid_response {
|
||||
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id);
|
||||
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
|
||||
self.deactivate_peer(io, peer_id);
|
||||
}
|
||||
if headers.is_empty() && self.state == SyncState::ChainHead {
|
||||
// Peer does not have any new subchain heads, deactivate it nd try with another
|
||||
trace!(target: "sync", "{} Deactivated for no data", peer_id);
|
||||
self.deactivate_peer(io, peer_id);
|
||||
}
|
||||
match self.state {
|
||||
|
@ -128,7 +128,6 @@ impl Handshake {
|
||||
/// Readable IO handler. Drives the state change.
|
||||
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
|
||||
if !self.expired() {
|
||||
io.clear_timer(self.connection.token).ok();
|
||||
match self.state {
|
||||
HandshakeState::New => {}
|
||||
HandshakeState::ReadingAuth => {
|
||||
@ -151,7 +150,9 @@ impl Handshake {
|
||||
try!(self.read_ack_eip8(host.secret(), &data));
|
||||
};
|
||||
},
|
||||
HandshakeState::StartSession => {},
|
||||
HandshakeState::StartSession => {
|
||||
io.clear_timer(self.connection.token).ok();
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
@ -523,11 +523,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
}
|
||||
|
||||
let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
|
||||
let mut started: usize = 0;
|
||||
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
|
||||
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
|
||||
self.connect_peer(&id, io);
|
||||
started += 1;
|
||||
}
|
||||
debug!(target: "network", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count());
|
||||
debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", self.session_count(), self.handshake_count(), started);
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="dev", allow(single_match))]
|
||||
@ -650,6 +652,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
break;
|
||||
},
|
||||
Ok(SessionData::Ready) => {
|
||||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
||||
if !s.info.originated {
|
||||
let session_count = self.session_count();
|
||||
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
|
||||
@ -667,7 +670,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
|
||||
for (p, _) in self.handlers.read().unwrap().iter() {
|
||||
if s.have_capability(p) {
|
||||
ready_data.push(p);
|
||||
@ -828,6 +830,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
||||
},
|
||||
NODE_TABLE => {
|
||||
trace!(target: "network", "Refreshing node table");
|
||||
self.nodes.write().unwrap().clear_useless();
|
||||
},
|
||||
_ => match self.timers.read().unwrap().get(&token).cloned() {
|
||||
|
Loading…
Reference in New Issue
Block a user