diff --git a/src/network/.host.rs.rustfmt b/src/network/.host.rs.rustfmt deleted file mode 100644 index 50954661c..000000000 --- a/src/network/.host.rs.rustfmt +++ /dev/null @@ -1,685 +0,0 @@ -#![allow(dead_code)] //TODO: remove this after everything is done -//TODO: remove all unwraps -use std::net::{SocketAddr, ToSocketAddrs}; -use std::collections::{HashMap}; -use std::hash::{Hash, Hasher}; -use std::str::{FromStr}; -use mio::*; -use mio::util::{Slab}; -use mio::tcp::*; -use mio::udp::*; -use hash::*; -use crypto::*; -use rlp::*; -use time::Tm; -use network::handshake::Handshake; -use network::session::{Session, SessionData}; -use network::{Error, ProtocolHandler}; - -const DEFAULT_PORT: u16 = 30304; - -const MAX_CONNECTIONS: usize = 1024; -const MAX_USER_TIMERS: usize = 32; -const IDEAL_PEERS:u32 = 10; - -pub type NodeId = H512; -pub type TimerToken = usize; - -#[derive(Debug)] -struct NetworkConfiguration { - listen_address: SocketAddr, - public_address: SocketAddr, - no_nat: bool, - no_discovery: bool, - pin: bool, -} - -impl NetworkConfiguration { - fn new() -> NetworkConfiguration { - NetworkConfiguration { - listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), - public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), - no_nat: false, - no_discovery: false, - pin: false, - } - } -} - -#[derive(Debug)] -pub struct NodeEndpoint { - address: SocketAddr, - address_str: String, - udp_port: u16 -} - -impl NodeEndpoint { - fn new(address: SocketAddr) -> NodeEndpoint { - NodeEndpoint { - address: address, - address_str: address.to_string(), - udp_port: address.port() - } - } - fn from_str(s: &str) -> Result { - println!("{:?}", s); - let address = s.to_socket_addrs().map(|mut i| i.next()); - match address { - Ok(Some(a)) => Ok(NodeEndpoint { - address: a, - address_str: s.to_string(), - udp_port: a.port() - }), - Ok(_) => Err(Error::AddressResolve(None)), - Err(e) => Err(Error::AddressResolve(Some(e))) - } - } -} - -#[derive(PartialEq, Eq, Copy, Clone)] -enum PeerType { - Required, - Optional -} - -struct Node { - id: NodeId, - endpoint: NodeEndpoint, - peer_type: PeerType, - last_attempted: Option, - confirmed: bool, -} - -impl FromStr for Node { - type Err = Error; - fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) - } - else { - (NodeId::new(), try!(NodeEndpoint::from_str(s))) - }; - - Ok(Node { - id: id, - endpoint: endpoint, - peer_type: PeerType::Optional, - last_attempted: None, - confirmed: false - }) - } -} - -impl Node { - fn new(id: NodeId, address: SocketAddr, t:PeerType) -> Node { - Node { - id: id, - endpoint: NodeEndpoint::new(address), - peer_type: t, - last_attempted: None, - confirmed: false - } - } -} - -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} -impl Eq for Node { } - -impl Hash for Node { - fn hash(&self, state: &mut H) where H: Hasher { - self.id.hash(state) - } -} - -// Tokens -const TCP_ACCEPT: usize = 1; -const IDLE: usize = 3; -const NODETABLE_RECEIVE: usize = 4; -const NODETABLE_MAINTAIN: usize = 5; -const NODETABLE_DISCOVERY: usize = 6; -const FIRST_CONNECTION: usize = 7; -const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; -const USER_TIMER: usize = LAST_CONNECTION; -const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; - -pub type PacketId = u8; -pub type ProtocolId = &'static str; - -pub enum HostMessage { - Shutdown, - AddHandler { - handler: Box, - protocol: ProtocolId, - versions: Vec, - }, - Send { - peer: PeerId, - packet_id: PacketId, - protocol: ProtocolId, - data: Vec, - }, - UserMessage(UserMessage), -} - -pub type UserMessageId = u32; - -pub struct UserMessage { - protocol: ProtocolId, - id: UserMessageId, - data: Option>, -} - -pub type PeerId = usize; - -#[derive(Debug, PartialEq, Eq)] -pub struct CapabilityInfo { - pub protocol: ProtocolId, - pub version: u8, - pub packet_count: u8, -} - -impl Encodable for CapabilityInfo { - fn encode(&self, encoder: &mut E) -> () where E: Encoder { - encoder.emit_list(|e| { - self.protocol.encode(e); - (self.version as u32).encode(e); - }); - } -} - -pub struct HostIo<'s> { - protocol: ProtocolId, - connections: &'s mut Slab, - timers: &'s mut Slab, - session: Option, - event_loop: &'s mut EventLoop, -} - -impl<'s> HostIo<'s> { - fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { - HostIo { - protocol: protocol, - session: session, - event_loop: event_loop, - connections: connections, - timers: timers, - } - } - - pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> { - match self.connections.get_mut(Token(peer)) { - Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { - warn!(target: "net", "Send error: {:?}", e); - }); //TODO: don't copy vector data - }, - _ => { - warn!(target: "net", "Send: Peer does not exist"); - } - } - Ok(()) - } - - pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error> { - match self.session { - Some(session) => self.send(session.as_usize(), packet_id, data), - None => { - panic!("Respond: Session does not exist") - } - } - } - - pub fn register_timer(&mut self, ms: u64) -> Result{ - match self.timers.insert(UserTimer { - delay: ms, - protocol: self.protocol, - }) { - Ok(token) => { - self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); - Ok(token.as_usize()) - }, - _ => { panic!("Max timers reached") } - } - } - - pub fn message(&mut self, id: UserMessageId, data: Option>) { - match self.event_loop.channel().send(HostMessage::UserMessage(UserMessage { - protocol: self.protocol, - id: id, - data: data - })) { - Ok(_) => {} - Err(e) => { panic!("Error sending io message {:?}", e); } - } - } - - pub fn disable_peer(&mut self, _peer: PeerId) { - //TODO: remove capability, disconnect if no capabilities left - } - -} - -struct UserTimer { - protocol: ProtocolId, - delay: u64, -} - -pub struct HostInfo { - keys: KeyPair, - config: NetworkConfiguration, - nonce: H256, - pub protocol_version: u32, - pub client_version: String, - pub listen_port: u16, - pub capabilities: Vec -} - -impl HostInfo { - pub fn id(&self) -> &NodeId { - self.keys.public() - } - - pub fn secret(&self) -> &Secret { - self.keys.secret() - } - pub fn next_nonce(&mut self) -> H256 { - self.nonce = self.nonce.sha3(); - return self.nonce.clone(); - } -} - -enum ConnectionEntry { - Handshake(Handshake), - Session(Session) -} - -pub struct Host { - info: HostInfo, - udp_socket: UdpSocket, - listener: TcpListener, - connections: Slab, - timers: Slab, - nodes: HashMap, - handlers: HashMap>, - idle_timeout: Timeout, - channel: Sender, -} - -impl Host { - pub fn start(event_loop: &mut EventLoop) -> Result<(), Error> { - let config = NetworkConfiguration::new(); - /* - match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { - Some(iface) => config.public_address = iface.addr.unwrap(), - None => warn!("No public network interface"), - } - */ - - let addr = config.listen_address; - // Setup the server socket - let listener = TcpListener::bind(&addr).unwrap(); - // Start listening for incoming connections - event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); - // Setup the client socket - //let sock = TcpStream::connect(&addr).unwrap(); - // Register the socket - //self.event_loop.register(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap(); - let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay - // open the udp socket - let udp_socket = UdpSocket::bound(&addr).unwrap(); - event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); - event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); - let port = config.listen_address.port(); - - let mut host = Host { - info: HostInfo { - keys: KeyPair::create().unwrap(), - config: config, - nonce: H256::random(), - protocol_version: 4, - client_version: "parity".to_string(), - listen_port: port, - //capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], - capabilities: Vec::new(), - }, - udp_socket: udp_socket, - listener: listener, - connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), - timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), - nodes: HashMap::new(), - handlers: HashMap::new(), - idle_timeout: idle_timeout, - channel: event_loop.channel(), - }; - - host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); - host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); - host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); - - try!(event_loop.run(&mut host)); - Ok(()) - } - - fn add_node(&mut self, id: &str) { - match Node::from_str(id) { - Err(e) => { warn!("Could not add node: {:?}", e); }, - Ok(n) => { - self.nodes.insert(n.id.clone(), n); - } - } - } - - fn maintain_network(&mut self, event_loop: &mut EventLoop) { - self.connect_peers(event_loop); - } - - fn have_session(&self, id: &NodeId) -> bool { - self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false }) - } - - fn connecting_to(&self, id: &NodeId) -> bool { - self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) - } - - fn connect_peers(&mut self, event_loop: &mut EventLoop) { - - struct NodeInfo { - id: NodeId, - peer_type: PeerType - } - - let mut to_connect: Vec = Vec::new(); - - let mut req_conn = 0; - //TODO: use nodes from discovery here - //for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) { - for n in self.nodes.values().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { - let connected = self.have_session(&n.id) || self.connecting_to(&n.id); - let required = n.peer_type == PeerType::Required; - if connected && required { - req_conn += 1; - } - else if !connected && (!self.info.config.pin || required) { - to_connect.push(n); - } - } - - for n in to_connect.iter() { - if n.peer_type == PeerType::Required { - if req_conn < IDEAL_PEERS { - self.connect_peer(&n.id, event_loop); - } - req_conn += 1; - } - } - - if !self.info.config.pin - { - let pending_count = 0; //TODO: - let peer_count = 0; - let mut open_slots = IDEAL_PEERS - peer_count - pending_count + req_conn; - if open_slots > 0 { - for n in to_connect.iter() { - if n.peer_type == PeerType::Optional && open_slots > 0 { - open_slots -= 1; - self.connect_peer(&n.id, event_loop); - } - } - } - } - } - - fn connect_peer(&mut self, id: &NodeId, event_loop: &mut EventLoop) { - if self.have_session(id) - { - warn!("Aborted connect. Node already connected."); - return; - } - if self.connecting_to(id) - { - warn!("Aborted connect. Node already connecting."); - return; - } - - let socket = { - let node = self.nodes.get_mut(id).unwrap(); - node.last_attempted = Some(::time::now()); - - - //blog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id(); - match TcpStream::connect(&node.endpoint.address) { - Ok(socket) => socket, - Err(_) => { - warn!("Cannot connect to node"); - return; - } - } - }; - - let nonce = self.info.next_nonce(); - match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { - Some(token) => { - match self.connections.get_mut(token) { - Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.start(&self.info, true) - .and_then(|_| h.register(event_loop)) - .unwrap_or_else (|e| { - debug!(target: "net", "Handshake create error: {:?}", e); - }); - }, - _ => {} - } - }, - None => { warn!("Max connections reached") } - } - } - - - fn accept(&mut self, _event_loop: &mut EventLoop) { - warn!(target: "net", "accept"); - } - - fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { - let mut kill = false; - let mut create_session = false; - { - match self.connections.get_mut(token) { - Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.writable(event_loop, &self.info).unwrap_or_else(|e| { - debug!(target: "net", "Handshake write error: {:?}", e); - kill = true; - }); - create_session = h.done(); - }, - Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.writable(event_loop, &self.info).unwrap_or_else(|e| { - debug!(target: "net", "Session write error: {:?}", e); - kill = true; - }); - } - _ => { - warn!(target: "net", "Received event for unknown connection"); - } - }; - } - if kill { - self.kill_connection(token, event_loop); - } - if create_session { - self.start_session(token, event_loop); - } - } - - - fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { - let mut kill = false; - let mut create_session = false; - let mut ready_data: Vec = Vec::new(); - let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; - { - match self.connections.get_mut(token) { - Some(&mut ConnectionEntry::Handshake(ref mut h)) => { - h.readable(event_loop, &self.info).unwrap_or_else(|e| { - debug!(target: "net", "Handshake read error: {:?}", e); - kill = true; - }); - create_session = h.done(); - }, - Some(&mut ConnectionEntry::Session(ref mut s)) => { - let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { - debug!(target: "net", "Session read error: {:?}", e); - kill = true; - SessionData::None - }) }; - match sd { - SessionData::Ready => { - for (p, _) in self.handlers.iter_mut() { - if s.have_capability(p) { - ready_data.push(p); - } - } - }, - SessionData::Packet { - data, - protocol, - packet_id, - } => { - match self.handlers.get_mut(protocol) { - None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, - Some(_) => packet_data = Some((protocol, packet_id, data)), - } - }, - SessionData::None => {}, - } - } - _ => { - warn!(target: "net", "Received event for unknown connection"); - } - }; - } - if kill { - self.kill_connection(token, event_loop); - } - if create_session { - self.start_session(token, event_loop); - } - for p in ready_data { - let mut h = self.handlers.get_mut(p).unwrap(); - h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); - } - if let Some((p, packet_id, data)) = packet_data { - let mut h = self.handlers.get_mut(p).unwrap(); - h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); - } - - } - - fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { - let info = &self.info; - self.connections.replace_with(token, |c| { - match c { - ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) - .map(|s| Some(ConnectionEntry::Session(s))) - .unwrap_or_else(|e| { - debug!(target: "net", "Session construction error: {:?}", e); - None - }), - _ => { panic!("No handshake to create a session from"); } - } - }).expect("Error updating slab with session"); - } - - fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_connection(token, event_loop) - } - fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop) { - self.connections.remove(token); - } -} - -impl Handler for Host { - type Timeout = Token; - type Message = HostMessage; - - fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { - if events.is_readable() { - match token.as_usize() { - TCP_ACCEPT => self.accept(event_loop), - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), - NODETABLE_RECEIVE => {}, - _ => panic!("Received unknown readable token"), - } - } - else if events.is_writable() { - match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), - _ => panic!("Received unknown writable token"), - } - } - } - - fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { - match token.as_usize() { - IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), - NODETABLE_DISCOVERY => {}, - NODETABLE_MAINTAIN => {}, - USER_TIMER ... LAST_USER_TIMER => { - let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol; - match self.handlers.get_mut(protocol) { - None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, - Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()), - } - } - _ => panic!("Unknown timer token"), - } - } - - fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { - match msg { - HostMessage::Shutdown => event_loop.shutdown(), - HostMessage::AddHandler { - handler, - protocol, - versions - } => { - self.handlers.insert(protocol, handler); - for v in versions { - self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); - } - }, - HostMessage::Send { - peer, - packet_id, - protocol, - data, - } => { - match self.connections.get_mut(Token(peer as usize)) { - Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { - warn!(target: "net", "Send error: {:?}", e); - }); //TODO: don't copy vector data - }, - _ => { - warn!(target: "net", "Send: Peer does not exist"); - } - } - }, - HostMessage::UserMessage(message) => { - for (p, h) in self.handlers.iter_mut() { - if p != &message.protocol { - h.message(&mut HostIo::new(message.protocol, None, event_loop, &mut self.connections, &mut self.timers), &message); - } - } - } - } - } -}