|
|
|
|
@@ -28,7 +28,7 @@ use std::fs;
|
|
|
|
|
use mio::*;
|
|
|
|
|
use mio::tcp::*;
|
|
|
|
|
use hash::*;
|
|
|
|
|
use misc::version;
|
|
|
|
|
use misc::*;
|
|
|
|
|
use crypto::*;
|
|
|
|
|
use sha3::Hashable;
|
|
|
|
|
use rlp::*;
|
|
|
|
|
@@ -202,7 +202,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|
|
|
|
protocol: ProtocolId,
|
|
|
|
|
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
|
|
|
|
|
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s, Message> {
|
|
|
|
|
let id = session.as_ref().map(|s| s.lock().unwrap().token());
|
|
|
|
|
let id = session.as_ref().map(|s| s.locked().token());
|
|
|
|
|
NetworkContext {
|
|
|
|
|
io: io,
|
|
|
|
|
protocol: protocol,
|
|
|
|
|
@@ -224,7 +224,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|
|
|
|
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
|
|
|
|
let session = self.resolve_session(peer);
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
try!(session.lock().unwrap().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
|
|
|
|
try!(session.locked().send_packet(self.io, self.protocol, packet_id as u8, &data));
|
|
|
|
|
} else {
|
|
|
|
|
trace!(target: "network", "Send: Peer no longer exist")
|
|
|
|
|
}
|
|
|
|
|
@@ -262,7 +262,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|
|
|
|
|
|
|
|
|
/// Check if the session is still active.
|
|
|
|
|
pub fn is_expired(&self) -> bool {
|
|
|
|
|
self.session.as_ref().map_or(false, |s| s.lock().unwrap().expired())
|
|
|
|
|
self.session.as_ref().map_or(false, |s| s.locked().expired())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
|
|
|
|
|
@@ -279,7 +279,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|
|
|
|
pub fn peer_info(&self, peer: PeerId) -> String {
|
|
|
|
|
let session = self.resolve_session(peer);
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
return session.lock().unwrap().info.client_version.clone()
|
|
|
|
|
return session.locked().info.client_version.clone()
|
|
|
|
|
}
|
|
|
|
|
"unknown".to_owned()
|
|
|
|
|
}
|
|
|
|
|
@@ -423,7 +423,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
|
|
|
|
|
|
|
|
|
self.nodes.write().unwrap().add_node(n);
|
|
|
|
|
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
|
|
|
|
|
if let Some(ref mut discovery) = *self.discovery.locked() {
|
|
|
|
|
discovery.add_node(entry);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -436,7 +436,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
|
|
|
|
|
self.reserved_nodes.write().unwrap().insert(n.id.clone());
|
|
|
|
|
|
|
|
|
|
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
|
|
|
|
|
if let Some(ref mut discovery) = *self.discovery.locked() {
|
|
|
|
|
discovery.add_node(entry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -454,7 +454,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
let reserved: HashSet<NodeId> = self.reserved_nodes.read().unwrap().clone();
|
|
|
|
|
let mut to_kill = Vec::new();
|
|
|
|
|
for e in self.sessions.write().unwrap().iter_mut() {
|
|
|
|
|
let mut s = e.lock().unwrap();
|
|
|
|
|
let mut s = e.locked();
|
|
|
|
|
{
|
|
|
|
|
let id = s.id();
|
|
|
|
|
if id.is_some() && reserved.contains(id.unwrap()) {
|
|
|
|
|
@@ -498,7 +498,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
self.stopping.store(true, AtomicOrdering::Release);
|
|
|
|
|
let mut to_kill = Vec::new();
|
|
|
|
|
for e in self.sessions.write().unwrap().iter_mut() {
|
|
|
|
|
let mut s = e.lock().unwrap();
|
|
|
|
|
let mut s = e.locked();
|
|
|
|
|
s.disconnect(io, DisconnectReason::ClientQuit);
|
|
|
|
|
to_kill.push(s.token());
|
|
|
|
|
}
|
|
|
|
|
@@ -555,7 +555,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
for n in self.nodes.read().unwrap().unordered_entries() {
|
|
|
|
|
discovery.add_node(n.clone());
|
|
|
|
|
}
|
|
|
|
|
*self.discovery.lock().unwrap() = Some(discovery);
|
|
|
|
|
*self.discovery.locked() = Some(discovery);
|
|
|
|
|
io.register_stream(DISCOVERY).expect("Error registering UDP listener");
|
|
|
|
|
io.register_timer(DISCOVERY_REFRESH, 7200).expect("Error registering discovery timer");
|
|
|
|
|
io.register_timer(DISCOVERY_ROUND, 300).expect("Error registering discovery timer");
|
|
|
|
|
@@ -571,7 +571,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn have_session(&self, id: &NodeId) -> bool {
|
|
|
|
|
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id == Some(id.clone()))
|
|
|
|
|
self.sessions.read().unwrap().iter().any(|e| e.locked().info.id == Some(id.clone()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn session_count(&self) -> usize {
|
|
|
|
|
@@ -579,7 +579,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn connecting_to(&self, id: &NodeId) -> bool {
|
|
|
|
|
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().id() == Some(id))
|
|
|
|
|
self.sessions.read().unwrap().iter().any(|e| e.locked().id() == Some(id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn handshake_count(&self) -> usize {
|
|
|
|
|
@@ -589,7 +589,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
|
|
|
|
let mut to_kill = Vec::new();
|
|
|
|
|
for e in self.sessions.write().unwrap().iter_mut() {
|
|
|
|
|
let mut s = e.lock().unwrap();
|
|
|
|
|
let mut s = e.locked();
|
|
|
|
|
if !s.keep_alive(io) {
|
|
|
|
|
s.disconnect(io, DisconnectReason::PingTimeout);
|
|
|
|
|
to_kill.push(s.token());
|
|
|
|
|
@@ -711,7 +711,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
fn accept(&self, io: &IoContext<NetworkIoMessage<Message>>) {
|
|
|
|
|
trace!(target: "network", "Accepting incoming connection");
|
|
|
|
|
loop {
|
|
|
|
|
let socket = match self.tcp_listener.lock().unwrap().accept() {
|
|
|
|
|
let socket = match self.tcp_listener.locked().accept() {
|
|
|
|
|
Ok(None) => break,
|
|
|
|
|
Ok(Some((sock, _addr))) => sock,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
@@ -728,7 +728,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
|
|
|
|
|
let session = { self.sessions.read().unwrap().get(token).cloned() };
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
let mut s = session.lock().unwrap();
|
|
|
|
|
let mut s = session.locked();
|
|
|
|
|
if let Err(e) = s.writable(io, &self.info.read().unwrap()) {
|
|
|
|
|
trace!(target: "network", "Session write error: {}: {:?}", token, e);
|
|
|
|
|
}
|
|
|
|
|
@@ -750,7 +750,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
let mut kill = false;
|
|
|
|
|
let session = { self.sessions.read().unwrap().get(token).cloned() };
|
|
|
|
|
if let Some(session) = session.clone() {
|
|
|
|
|
let mut s = session.lock().unwrap();
|
|
|
|
|
let mut s = session.locked();
|
|
|
|
|
loop {
|
|
|
|
|
match s.readable(io, &self.info.read().unwrap()) {
|
|
|
|
|
Err(e) => {
|
|
|
|
|
@@ -785,7 +785,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
if let Ok(address) = s.remote_addr() {
|
|
|
|
|
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
|
|
|
|
|
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
|
|
|
|
|
let mut discovery = self.discovery.lock().unwrap();
|
|
|
|
|
let mut discovery = self.discovery.locked();
|
|
|
|
|
if let Some(ref mut discovery) = *discovery.deref_mut() {
|
|
|
|
|
discovery.add_node(entry);
|
|
|
|
|
}
|
|
|
|
|
@@ -843,7 +843,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
let sessions = self.sessions.write().unwrap();
|
|
|
|
|
if let Some(session) = sessions.get(token).cloned() {
|
|
|
|
|
expired_session = Some(session.clone());
|
|
|
|
|
let mut s = session.lock().unwrap();
|
|
|
|
|
let mut s = session.locked();
|
|
|
|
|
if !s.expired() {
|
|
|
|
|
if s.is_ready() {
|
|
|
|
|
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
|
|
|
|
|
@@ -879,7 +879,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|
|
|
|
{
|
|
|
|
|
let sessions = self.sessions.write().unwrap();
|
|
|
|
|
for c in sessions.iter() {
|
|
|
|
|
let s = c.lock().unwrap();
|
|
|
|
|
let s = c.locked();
|
|
|
|
|
if let Some(id) = s.id() {
|
|
|
|
|
if node_changes.removed.contains(id) {
|
|
|
|
|
to_remove.push(s.token());
|
|
|
|
|
@@ -918,7 +918,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
match stream {
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
|
|
|
|
DISCOVERY => {
|
|
|
|
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) };
|
|
|
|
|
let node_changes = { self.discovery.locked().as_mut().unwrap().readable(io) };
|
|
|
|
|
if let Some(node_changes) = node_changes {
|
|
|
|
|
self.update_nodes(io, node_changes);
|
|
|
|
|
}
|
|
|
|
|
@@ -935,7 +935,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
match stream {
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
|
|
|
|
DISCOVERY => {
|
|
|
|
|
self.discovery.lock().unwrap().as_mut().unwrap().writable(io);
|
|
|
|
|
self.discovery.locked().as_mut().unwrap().writable(io);
|
|
|
|
|
}
|
|
|
|
|
_ => panic!("Received unknown writable token"),
|
|
|
|
|
}
|
|
|
|
|
@@ -951,11 +951,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
warn!("Error initializing public interface: {:?}", e)),
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
|
|
|
|
DISCOVERY_REFRESH => {
|
|
|
|
|
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
|
|
|
|
|
self.discovery.locked().as_mut().unwrap().refresh();
|
|
|
|
|
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
|
|
|
|
},
|
|
|
|
|
DISCOVERY_ROUND => {
|
|
|
|
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
|
|
|
|
|
let node_changes = { self.discovery.locked().as_mut().unwrap().round() };
|
|
|
|
|
if let Some(node_changes) = node_changes {
|
|
|
|
|
self.update_nodes(io, node_changes);
|
|
|
|
|
}
|
|
|
|
|
@@ -1015,7 +1015,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
NetworkIoMessage::Disconnect(ref peer) => {
|
|
|
|
|
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
|
|
|
|
|
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
|
|
|
|
|
}
|
|
|
|
|
trace!(target: "network", "Disconnect requested {}", peer);
|
|
|
|
|
self.kill_connection(*peer, io, false);
|
|
|
|
|
@@ -1023,8 +1023,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
NetworkIoMessage::DisablePeer(ref peer) => {
|
|
|
|
|
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
session.lock().unwrap().disconnect(io, DisconnectReason::DisconnectRequested);
|
|
|
|
|
if let Some(id) = session.lock().unwrap().id() {
|
|
|
|
|
session.locked().disconnect(io, DisconnectReason::DisconnectRequested);
|
|
|
|
|
if let Some(id) = session.locked().id() {
|
|
|
|
|
self.nodes.write().unwrap().mark_as_useless(id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1046,11 +1046,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => {
|
|
|
|
|
let session = { self.sessions.read().unwrap().get(stream).cloned() };
|
|
|
|
|
if let Some(session) = session {
|
|
|
|
|
session.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
|
|
|
|
|
session.locked().register_socket(reg, event_loop).expect("Error registering socket");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
|
|
|
|
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
|
|
|
|
DISCOVERY => self.discovery.locked().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
|
|
|
|
|
TCP_ACCEPT => event_loop.register(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
|
|
|
|
|
_ => warn!("Unexpected stream registration")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1060,7 +1060,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => {
|
|
|
|
|
let mut connections = self.sessions.write().unwrap();
|
|
|
|
|
if let Some(connection) = connections.get(stream).cloned() {
|
|
|
|
|
connection.lock().unwrap().deregister_socket(event_loop).expect("Error deregistering socket");
|
|
|
|
|
connection.locked().deregister_socket(event_loop).expect("Error deregistering socket");
|
|
|
|
|
connections.remove(stream);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -1074,11 +1074,11 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|
|
|
|
FIRST_SESSION ... LAST_SESSION => {
|
|
|
|
|
let connection = { self.sessions.read().unwrap().get(stream).cloned() };
|
|
|
|
|
if let Some(connection) = connection {
|
|
|
|
|
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
|
|
|
|
|
connection.locked().update_socket(reg, event_loop).expect("Error updating socket");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
|
|
|
|
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock().unwrap(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
|
|
|
|
DISCOVERY => self.discovery.locked().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
|
|
|
|
|
TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.locked(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
|
|
|
|
|
_ => warn!("Unexpected stream update")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|