diff --git a/Cargo.toml b/Cargo.toml index 0e584d8d7..9da1b73c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ mio = "0.5.0" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" -rocksdb = "0.2.1" -lazy_static = "0.1.0" +rocksdb = "0.2" +lazy_static = "0.1" eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" } rust-crypto = "0.2.34" elastic-array = "0.4" diff --git a/src/network/.host.rs.rustfmt b/src/network/.host.rs.rustfmt new file mode 100644 index 000000000..50954661c --- /dev/null +++ b/src/network/.host.rs.rustfmt @@ -0,0 +1,685 @@ +#![allow(dead_code)] //TODO: remove this after everything is done +//TODO: remove all unwraps +use std::net::{SocketAddr, ToSocketAddrs}; +use std::collections::{HashMap}; +use std::hash::{Hash, Hasher}; +use std::str::{FromStr}; +use mio::*; +use mio::util::{Slab}; +use mio::tcp::*; +use mio::udp::*; +use hash::*; +use crypto::*; +use rlp::*; +use time::Tm; +use network::handshake::Handshake; +use network::session::{Session, SessionData}; +use network::{Error, ProtocolHandler}; + +const DEFAULT_PORT: u16 = 30304; + +const MAX_CONNECTIONS: usize = 1024; +const MAX_USER_TIMERS: usize = 32; +const IDEAL_PEERS:u32 = 10; + +pub type NodeId = H512; +pub type TimerToken = usize; + +#[derive(Debug)] +struct NetworkConfiguration { + listen_address: SocketAddr, + public_address: SocketAddr, + no_nat: bool, + no_discovery: bool, + pin: bool, +} + +impl NetworkConfiguration { + fn new() -> NetworkConfiguration { + NetworkConfiguration { + listen_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), + public_address: SocketAddr::from_str("0.0.0.0:30304").unwrap(), + no_nat: false, + no_discovery: false, + pin: false, + } + } +} + +#[derive(Debug)] +pub struct NodeEndpoint { + address: SocketAddr, + address_str: String, + udp_port: u16 +} + +impl NodeEndpoint { + fn new(address: SocketAddr) -> NodeEndpoint { + NodeEndpoint { + address: address, + address_str: address.to_string(), + udp_port: address.port() + } + } + fn from_str(s: &str) -> Result { + println!("{:?}", s); + let address = s.to_socket_addrs().map(|mut i| i.next()); + match address { + Ok(Some(a)) => Ok(NodeEndpoint { + address: a, + address_str: s.to_string(), + udp_port: a.port() + }), + Ok(_) => Err(Error::AddressResolve(None)), + Err(e) => Err(Error::AddressResolve(Some(e))) + } + } +} + +#[derive(PartialEq, Eq, Copy, Clone)] +enum PeerType { + Required, + Optional +} + +struct Node { + id: NodeId, + endpoint: NodeEndpoint, + peer_type: PeerType, + last_attempted: Option, + confirmed: bool, +} + +impl FromStr for Node { + type Err = Error; + fn from_str(s: &str) -> Result { + let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { + (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) + } + else { + (NodeId::new(), try!(NodeEndpoint::from_str(s))) + }; + + Ok(Node { + id: id, + endpoint: endpoint, + peer_type: PeerType::Optional, + last_attempted: None, + confirmed: false + }) + } +} + +impl Node { + fn new(id: NodeId, address: SocketAddr, t:PeerType) -> Node { + Node { + id: id, + endpoint: NodeEndpoint::new(address), + peer_type: t, + last_attempted: None, + confirmed: false + } + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} +impl Eq for Node { } + +impl Hash for Node { + fn hash(&self, state: &mut H) where H: Hasher { + self.id.hash(state) + } +} + +// Tokens +const TCP_ACCEPT: usize = 1; +const IDLE: usize = 3; +const NODETABLE_RECEIVE: usize = 4; +const NODETABLE_MAINTAIN: usize = 5; +const NODETABLE_DISCOVERY: usize = 6; +const FIRST_CONNECTION: usize = 7; +const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; +const USER_TIMER: usize = LAST_CONNECTION; +const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; + +pub type PacketId = u8; +pub type ProtocolId = &'static str; + +pub enum HostMessage { + Shutdown, + AddHandler { + handler: Box, + protocol: ProtocolId, + versions: Vec, + }, + Send { + peer: PeerId, + packet_id: PacketId, + protocol: ProtocolId, + data: Vec, + }, + UserMessage(UserMessage), +} + +pub type UserMessageId = u32; + +pub struct UserMessage { + protocol: ProtocolId, + id: UserMessageId, + data: Option>, +} + +pub type PeerId = usize; + +#[derive(Debug, PartialEq, Eq)] +pub struct CapabilityInfo { + pub protocol: ProtocolId, + pub version: u8, + pub packet_count: u8, +} + +impl Encodable for CapabilityInfo { + fn encode(&self, encoder: &mut E) -> () where E: Encoder { + encoder.emit_list(|e| { + self.protocol.encode(e); + (self.version as u32).encode(e); + }); + } +} + +pub struct HostIo<'s> { + protocol: ProtocolId, + connections: &'s mut Slab, + timers: &'s mut Slab, + session: Option, + event_loop: &'s mut EventLoop, +} + +impl<'s> HostIo<'s> { + fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { + HostIo { + protocol: protocol, + session: session, + event_loop: event_loop, + connections: connections, + timers: timers, + } + } + + pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> { + match self.connections.get_mut(Token(peer)) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { + warn!(target: "net", "Send error: {:?}", e); + }); //TODO: don't copy vector data + }, + _ => { + warn!(target: "net", "Send: Peer does not exist"); + } + } + Ok(()) + } + + pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error> { + match self.session { + Some(session) => self.send(session.as_usize(), packet_id, data), + None => { + panic!("Respond: Session does not exist") + } + } + } + + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.timers.insert(UserTimer { + delay: ms, + protocol: self.protocol, + }) { + Ok(token) => { + self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); + Ok(token.as_usize()) + }, + _ => { panic!("Max timers reached") } + } + } + + pub fn message(&mut self, id: UserMessageId, data: Option>) { + match self.event_loop.channel().send(HostMessage::UserMessage(UserMessage { + protocol: self.protocol, + id: id, + data: data + })) { + Ok(_) => {} + Err(e) => { panic!("Error sending io message {:?}", e); } + } + } + + pub fn disable_peer(&mut self, _peer: PeerId) { + //TODO: remove capability, disconnect if no capabilities left + } + +} + +struct UserTimer { + protocol: ProtocolId, + delay: u64, +} + +pub struct HostInfo { + keys: KeyPair, + config: NetworkConfiguration, + nonce: H256, + pub protocol_version: u32, + pub client_version: String, + pub listen_port: u16, + pub capabilities: Vec +} + +impl HostInfo { + pub fn id(&self) -> &NodeId { + self.keys.public() + } + + pub fn secret(&self) -> &Secret { + self.keys.secret() + } + pub fn next_nonce(&mut self) -> H256 { + self.nonce = self.nonce.sha3(); + return self.nonce.clone(); + } +} + +enum ConnectionEntry { + Handshake(Handshake), + Session(Session) +} + +pub struct Host { + info: HostInfo, + udp_socket: UdpSocket, + listener: TcpListener, + connections: Slab, + timers: Slab, + nodes: HashMap, + handlers: HashMap>, + idle_timeout: Timeout, + channel: Sender, +} + +impl Host { + pub fn start(event_loop: &mut EventLoop) -> Result<(), Error> { + let config = NetworkConfiguration::new(); + /* + match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { + Some(iface) => config.public_address = iface.addr.unwrap(), + None => warn!("No public network interface"), + } + */ + + let addr = config.listen_address; + // Setup the server socket + let listener = TcpListener::bind(&addr).unwrap(); + // Start listening for incoming connections + event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); + // Setup the client socket + //let sock = TcpStream::connect(&addr).unwrap(); + // Register the socket + //self.event_loop.register(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap(); + let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay + // open the udp socket + let udp_socket = UdpSocket::bound(&addr).unwrap(); + event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); + event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); + let port = config.listen_address.port(); + + let mut host = Host { + info: HostInfo { + keys: KeyPair::create().unwrap(), + config: config, + nonce: H256::random(), + protocol_version: 4, + client_version: "parity".to_string(), + listen_port: port, + //capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], + capabilities: Vec::new(), + }, + udp_socket: udp_socket, + listener: listener, + connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), + timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), + nodes: HashMap::new(), + handlers: HashMap::new(), + idle_timeout: idle_timeout, + channel: event_loop.channel(), + }; + + host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); + host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); + host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); + host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); + host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); + host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); + + try!(event_loop.run(&mut host)); + Ok(()) + } + + fn add_node(&mut self, id: &str) { + match Node::from_str(id) { + Err(e) => { warn!("Could not add node: {:?}", e); }, + Ok(n) => { + self.nodes.insert(n.id.clone(), n); + } + } + } + + fn maintain_network(&mut self, event_loop: &mut EventLoop) { + self.connect_peers(event_loop); + } + + fn have_session(&self, id: &NodeId) -> bool { + self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.info.id.eq(&id), _ => false }) + } + + fn connecting_to(&self, id: &NodeId) -> bool { + self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) + } + + fn connect_peers(&mut self, event_loop: &mut EventLoop) { + + struct NodeInfo { + id: NodeId, + peer_type: PeerType + } + + let mut to_connect: Vec = Vec::new(); + + let mut req_conn = 0; + //TODO: use nodes from discovery here + //for n in self.node_buckets.iter().flat_map(|n| &n.nodes).map(|id| NodeInfo { id: id.clone(), peer_type: self.nodes.get(id).unwrap().peer_type}) { + for n in self.nodes.values().map(|n| NodeInfo { id: n.id.clone(), peer_type: n.peer_type }) { + let connected = self.have_session(&n.id) || self.connecting_to(&n.id); + let required = n.peer_type == PeerType::Required; + if connected && required { + req_conn += 1; + } + else if !connected && (!self.info.config.pin || required) { + to_connect.push(n); + } + } + + for n in to_connect.iter() { + if n.peer_type == PeerType::Required { + if req_conn < IDEAL_PEERS { + self.connect_peer(&n.id, event_loop); + } + req_conn += 1; + } + } + + if !self.info.config.pin + { + let pending_count = 0; //TODO: + let peer_count = 0; + let mut open_slots = IDEAL_PEERS - peer_count - pending_count + req_conn; + if open_slots > 0 { + for n in to_connect.iter() { + if n.peer_type == PeerType::Optional && open_slots > 0 { + open_slots -= 1; + self.connect_peer(&n.id, event_loop); + } + } + } + } + } + + fn connect_peer(&mut self, id: &NodeId, event_loop: &mut EventLoop) { + if self.have_session(id) + { + warn!("Aborted connect. Node already connected."); + return; + } + if self.connecting_to(id) + { + warn!("Aborted connect. Node already connecting."); + return; + } + + let socket = { + let node = self.nodes.get_mut(id).unwrap(); + node.last_attempted = Some(::time::now()); + + + //blog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id(); + match TcpStream::connect(&node.endpoint.address) { + Ok(socket) => socket, + Err(_) => { + warn!("Cannot connect to node"); + return; + } + } + }; + + let nonce = self.info.next_nonce(); + match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { + Some(token) => { + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.start(&self.info, true) + .and_then(|_| h.register(event_loop)) + .unwrap_or_else (|e| { + debug!(target: "net", "Handshake create error: {:?}", e); + }); + }, + _ => {} + } + }, + None => { warn!("Max connections reached") } + } + } + + + fn accept(&mut self, _event_loop: &mut EventLoop) { + warn!(target: "net", "accept"); + } + + fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + let mut kill = false; + let mut create_session = false; + { + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.writable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Handshake write error: {:?}", e); + kill = true; + }); + create_session = h.done(); + }, + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.writable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Session write error: {:?}", e); + kill = true; + }); + } + _ => { + warn!(target: "net", "Received event for unknown connection"); + } + }; + } + if kill { + self.kill_connection(token, event_loop); + } + if create_session { + self.start_session(token, event_loop); + } + } + + + fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + let mut kill = false; + let mut create_session = false; + let mut ready_data: Vec = Vec::new(); + let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; + { + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.readable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Handshake read error: {:?}", e); + kill = true; + }); + create_session = h.done(); + }, + Some(&mut ConnectionEntry::Session(ref mut s)) => { + let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Session read error: {:?}", e); + kill = true; + SessionData::None + }) }; + match sd { + SessionData::Ready => { + for (p, _) in self.handlers.iter_mut() { + if s.have_capability(p) { + ready_data.push(p); + } + } + }, + SessionData::Packet { + data, + protocol, + packet_id, + } => { + match self.handlers.get_mut(protocol) { + None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, + Some(_) => packet_data = Some((protocol, packet_id, data)), + } + }, + SessionData::None => {}, + } + } + _ => { + warn!(target: "net", "Received event for unknown connection"); + } + }; + } + if kill { + self.kill_connection(token, event_loop); + } + if create_session { + self.start_session(token, event_loop); + } + for p in ready_data { + let mut h = self.handlers.get_mut(p).unwrap(); + h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + } + if let Some((p, packet_id, data)) = packet_data { + let mut h = self.handlers.get_mut(p).unwrap(); + h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); + } + + } + + fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { + let info = &self.info; + self.connections.replace_with(token, |c| { + match c { + ConnectionEntry::Handshake(h) => Session::new(h, event_loop, info) + .map(|s| Some(ConnectionEntry::Session(s))) + .unwrap_or_else(|e| { + debug!(target: "net", "Session construction error: {:?}", e); + None + }), + _ => { panic!("No handshake to create a session from"); } + } + }).expect("Error updating slab with session"); + } + + fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { + self.kill_connection(token, event_loop) + } + fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop) { + self.connections.remove(token); + } +} + +impl Handler for Host { + type Timeout = Token; + type Message = HostMessage; + + fn ready(&mut self, event_loop: &mut EventLoop, token: Token, events: EventSet) { + if events.is_readable() { + match token.as_usize() { + TCP_ACCEPT => self.accept(event_loop), + IDLE => self.maintain_network(event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), + NODETABLE_RECEIVE => {}, + _ => panic!("Received unknown readable token"), + } + } + else if events.is_writable() { + match token.as_usize() { + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), + _ => panic!("Received unknown writable token"), + } + } + } + + fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { + match token.as_usize() { + IDLE => self.maintain_network(event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), + NODETABLE_DISCOVERY => {}, + NODETABLE_MAINTAIN => {}, + USER_TIMER ... LAST_USER_TIMER => { + let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol; + match self.handlers.get_mut(protocol) { + None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, + Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()), + } + } + _ => panic!("Unknown timer token"), + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + match msg { + HostMessage::Shutdown => event_loop.shutdown(), + HostMessage::AddHandler { + handler, + protocol, + versions + } => { + self.handlers.insert(protocol, handler); + for v in versions { + self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); + } + }, + HostMessage::Send { + peer, + packet_id, + protocol, + data, + } => { + match self.connections.get_mut(Token(peer as usize)) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { + warn!(target: "net", "Send error: {:?}", e); + }); //TODO: don't copy vector data + }, + _ => { + warn!(target: "net", "Send: Peer does not exist"); + } + } + }, + HostMessage::UserMessage(message) => { + for (p, h) in self.handlers.iter_mut() { + if p != &message.protocol { + h.message(&mut HostIo::new(message.protocol, None, event_loop, &mut self.connections, &mut self.timers), &message); + } + } + } + } + } +}