Small refactoring

This commit is contained in:
arkpar
2016-02-14 01:03:48 +01:00
parent 62b9f4b91d
commit 76ea030b78
8 changed files with 257 additions and 267 deletions

View File

@@ -32,18 +32,18 @@ use network::session::{Session, SessionData};
use error::*;
use io::*;
use network::{NetworkProtocolHandler, PROTOCOL_VERSION};
use network::node::*;
use network::node_table::*;
use network::stats::NetworkStats;
use network::error::DisconnectReason;
use igd::{PortMappingProtocol,search_gateway};
use network::discovery::{Discovery, TableUpdates, NodeEntry};
use network::node_table::NodeTable;
type Slab<T> = ::slab::Slab<T, usize>;
const _DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024;
const MAINTENANCE_TIMEOUT: u64 = 1000;
const MAX_HANDSHAKES: usize = 100;
#[derive(Debug)]
/// Network service configuration
@@ -226,7 +226,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
_ => warn!(target: "net", "Send: Peer is not connected yet")
}
} else {
warn!(target: "net", "Send: Peer does not exist")
trace!(target: "net", "Send: Peer no longer exist")
}
Ok(())
}
@@ -405,11 +405,23 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn have_session(&self, id: &NodeId) -> bool {
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false })
self.connections.read().unwrap().iter().any(|e|
match *e.lock().unwrap().deref() { ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false })
}
fn session_count(&self) -> usize {
self.connections.read().unwrap().iter().filter(|e|
match *e.lock().unwrap().deref() { ConnectionEntry::Session(_) => true, _ => false }).count()
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.connections.read().unwrap().iter().any(|e| match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
self.connections.read().unwrap().iter().any(|e|
match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false })
}
fn handshake_count(&self) -> usize {
self.connections.read().unwrap().iter().filter(|e|
match *e.lock().unwrap().deref() { ConnectionEntry::Handshake(_) => true, _ => false }).count()
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
@@ -423,64 +435,40 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
for p in to_kill {
self.kill_connection(p, io);
self.kill_connection(p, io, true);
}
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
struct NodeInfo {
id: NodeId,
peer_type: PeerType
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
let connections = self.session_count();
if connections >= ideal_peers as usize {
return;
}
let mut to_connect: Vec<NodeInfo> = Vec::new();
let mut req_conn = 0;
let pin = self.info.read().unwrap().deref().config.pin;
let ideal_peers = self.info.read().unwrap().deref().config.ideal_peers;
for n in self.nodes.read().unwrap().nodes().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) {
let connected = self.have_session(&n.id) || self.connecting_to(&n.id);
let required = n.peer_type == PeerType::Required;
if connected && required {
req_conn += 1;
}
else if !connected && (!pin || required) {
to_connect.push(n);
}
let handshake_count = self.handshake_count();
if handshake_count >= MAX_HANDSHAKES {
return;
}
for n in &to_connect {
if n.peer_type == PeerType::Required {
if req_conn < ideal_peers {
self.connect_peer(&n.id, io);
}
req_conn += 1;
}
}
if !pin {
let pending_count = 0; //TODO:
let peer_count = 0;
let mut open_slots = ideal_peers - peer_count - pending_count + req_conn;
if open_slots > 0 {
for n in &to_connect {
if n.peer_type == PeerType::Optional && open_slots > 0 {
open_slots -= 1;
self.connect_peer(&n.id, io);
}
}
}
let nodes = { self.nodes.read().unwrap().nodes() };
for id in nodes.iter().filter(|ref id| !self.have_session(id) && !self.connecting_to(id)).take(MAX_HANDSHAKES - handshake_count) {
self.connect_peer(&id, io);
}
debug!(target: "net", "Connecting peers: {} sessions, {} pending", self.session_count(), self.handshake_count());
}
#[allow(single_match)]
fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage<Message>>) {
if self.have_session(id)
{
debug!("Aborted connect. Node already connected.");
trace!("Aborted connect. Node already connected.");
return;
}
if self.connecting_to(id) {
debug!("Aborted connect. Node already connecting.");
trace!("Aborted connect. Node already connecting.");
return;
}
@@ -542,7 +530,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
ConnectionEntry::Handshake(ref mut h) => {
match h.writable(io, &self.info.read().unwrap()) {
Err(e) => {
debug!(target: "net", "Handshake write error: {:?}", e);
debug!(target: "net", "Handshake write error: {}:{:?}", token, e);
kill = true;
},
Ok(_) => ()
@@ -554,7 +542,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
ConnectionEntry::Session(ref mut s) => {
match s.writable(io, &self.info.read().unwrap()) {
Err(e) => {
debug!(target: "net", "Session write error: {:?}", e);
debug!(target: "net", "Session write error: {}:{:?}", token, e);
kill = true;
},
Ok(_) => ()
@@ -564,7 +552,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
if kill {
self.kill_connection(token, io); //TODO: mark connection as dead an check in kill_connection
self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection
return;
} else if create_session {
self.start_session(token, io);
@@ -573,7 +561,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn connection_closed(&self, token: TimerToken, io: &IoContext<NetworkIoMessage<Message>>) {
self.kill_connection(token, io);
self.kill_connection(token, io, true);
}
fn connection_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
@@ -585,7 +573,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(ref mut h) => {
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
debug!(target: "net", "Handshake read error: {:?}", e);
debug!(target: "net", "Handshake read error: {}:{:?}", token, e);
kill = true;
}
if h.done() {
@@ -595,7 +583,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
ConnectionEntry::Session(ref mut s) => {
match s.readable(io, &self.info.read().unwrap()) {
Err(e) => {
debug!(target: "net", "Handshake read error: {:?}", e);
debug!(target: "net", "Handshake read error: {}:{:?}", token, e);
kill = true;
},
Ok(SessionData::Ready) => {
@@ -621,7 +609,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
if kill {
self.kill_connection(token, io); //TODO: mark connection as dead an check in kill_connection
self.kill_connection(token, io, true); //TODO: mark connection as dead an check in kill_connection
return;
} else if create_session {
self.start_session(token, io);
@@ -657,17 +645,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
self.kill_connection(token, io)
self.kill_connection(token, io, true)
}
fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
fn kill_connection(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>, remote: bool) {
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
{
let mut connections = self.connections.write().unwrap();
if let Some(connection) = connections.get(token).cloned() {
match *connection.lock().unwrap().deref_mut() {
ConnectionEntry::Handshake(_) => {
ConnectionEntry::Handshake(ref h) => {
connections.remove(token);
if remote {
self.nodes.write().unwrap().note_failure(h.id());
}
},
ConnectionEntry::Session(ref mut s) if s.is_ready() => {
for (p, _) in self.handlers.read().unwrap().iter() {
@@ -676,6 +667,9 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
connections.remove(token);
if remote {
self.nodes.write().unwrap().note_failure(s.id());
}
},
_ => {},
}
@@ -706,7 +700,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
for i in to_remove {
self.kill_connection(i, io);
self.kill_connection(i, io, false);
}
self.nodes.write().unwrap().update(node_changes);
}
@@ -816,7 +810,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ConnectionEntry::Session(ref mut s) => { s.disconnect(DisconnectReason::DisconnectRequested); }
}
}
self.kill_connection(*peer, io);
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::User(ref message) => {
for (p, h) in self.handlers.read().unwrap().iter() {