Fixed loosing peers on incoming connections. (#1293)

* Deactivate peer if it has no new data

* Fixed node table timer registration

* Fixed handshake timeout expiration

* Extra trace

* Fixed session count calculation

* Only deactivate incapable peers in ChainHead state

* Timer registration is not needed
This commit is contained in:
Arkadiy Paronyan 2016-06-15 19:01:58 +02:00 committed by arkpar
parent 549647b6f2
commit 29e286572c
3 changed files with 15 additions and 5 deletions

View File

@ -405,6 +405,7 @@ impl ChainSync {
} }
if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) { if item_count == 0 && (self.state == SyncState::Blocks || self.state == SyncState::NewBlocks) {
self.deactivate_peer(io, peer_id); //TODO: is this too harsh? self.deactivate_peer(io, peer_id); //TODO: is this too harsh?
self.continue_sync(io);
return Ok(()); return Ok(());
} }
@ -452,7 +453,12 @@ impl ChainSync {
// Disable the peer for this syncing round if it gives invalid chain // Disable the peer for this syncing round if it gives invalid chain
if !valid_response { if !valid_response {
trace!(target: "sync", "{} Disabled for invalid headers response", peer_id); trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
self.deactivate_peer(io, peer_id);
}
if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id); self.deactivate_peer(io, peer_id);
} }
match self.state { match self.state {

View File

@ -128,7 +128,6 @@ impl Handshake {
/// Readable IO handler. Drives the state change. /// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone { pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
if !self.expired() { if !self.expired() {
io.clear_timer(self.connection.token).ok();
match self.state { match self.state {
HandshakeState::New => {} HandshakeState::New => {}
HandshakeState::ReadingAuth => { HandshakeState::ReadingAuth => {
@ -151,7 +150,9 @@ impl Handshake {
try!(self.read_ack_eip8(host.secret(), &data)); try!(self.read_ack_eip8(host.secret(), &data));
}; };
}, },
HandshakeState::StartSession => {}, HandshakeState::StartSession => {
io.clear_timer(self.connection.token).ok();
},
} }
} }
Ok(()) Ok(())

View File

@ -523,11 +523,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() }; let nodes = if pin { self.pinned_nodes.clone() } else { self.nodes.read().unwrap().nodes() };
let mut started: usize = 0;
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id)) for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id))
.take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) { .take(min(MAX_HANDSHAKES_PER_ROUND, handshake_limit - handshake_count)) {
self.connect_peer(&id, io); self.connect_peer(&id, io);
started += 1;
} }
debug!(target: "network", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count()); debug!(target: "network", "Connecting peers: {} sessions, {} pending, {} started", self.session_count(), self.handshake_count(), started);
} }
#[cfg_attr(feature="dev", allow(single_match))] #[cfg_attr(feature="dev", allow(single_match))]
@ -650,6 +652,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
break; break;
}, },
Ok(SessionData::Ready) => { Ok(SessionData::Ready) => {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated { if !s.info.originated {
let session_count = self.session_count(); let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers }; let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
@ -667,7 +670,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
} }
} }
} }
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().unwrap().iter() { for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) { if s.have_capability(p) {
ready_data.push(p); ready_data.push(p);
@ -828,6 +830,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
io.update_registration(DISCOVERY).expect("Error updating discovery registration"); io.update_registration(DISCOVERY).expect("Error updating discovery registration");
}, },
NODE_TABLE => { NODE_TABLE => {
trace!(target: "network", "Refreshing node table");
self.nodes.write().unwrap().clear_useless(); self.nodes.write().unwrap().clear_useless();
}, },
_ => match self.timers.read().unwrap().get(&token).cloned() { _ => match self.timers.read().unwrap().get(&token).cloned() {