diff --git a/src/crypto.rs b/src/crypto.rs index 6843a204b..0ffd0f876 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -48,10 +48,10 @@ impl From<::std::io::Error> for CryptoError { /// fn main() { /// let pair = KeyPair::create().unwrap(); /// let message = H256::random(); -/// let signature = sign(pair.secret(), &message).unwrap(); +/// let signature = ec::sign(pair.secret(), &message).unwrap(); /// -/// assert!(verify(pair.public(), &signature, &message).unwrap()); -/// assert_eq!(recover(&signature, &message).unwrap(), *pair.public()); +/// assert!(ec::verify(pair.public(), &signature, &message).unwrap()); +/// assert_eq!(ec::recover(&signature, &message).unwrap(), *pair.public()); /// } /// ``` pub struct KeyPair { @@ -200,6 +200,49 @@ pub mod ecies { Ok(msg) } + pub fn decrypt(secret: &Secret, encrypted: &[u8]) -> Result { + use ::rcrypto::digest::Digest; + use ::rcrypto::sha2::Sha256; + use ::rcrypto::hmac::Hmac; + use ::rcrypto::mac::Mac; + + let meta_len = encrypted.len() - (1 + 64 + 16 + 32); + if encrypted.len() < meta_len || encrypted[0] < 2 || encrypted[0] > 4 { + return Err(CryptoError::InvalidMessage); //invalid message: publickey + } + + let e = &encrypted[1..]; + let p = Public::from_slice(&e[0..64]); + let z = try!(ecdh::agree(secret, &p)); + let mut key = [0u8; 32]; + kdf(&z, &[0u8; 0], &mut key); + let ekey = &key[0..16]; + let mkey_material = &key[16..32]; + let mut hasher = Sha256::new(); + let mut mkey = [0u8; 32]; + hasher.input(mkey_material); + hasher.result(&mut mkey); + + let clen = encrypted.len() - meta_len; + let cypher_with_iv = &e[64..(64+16+clen)]; + let cypher_iv = &cypher_with_iv[0..16]; + let cypher_no_iv = &cypher_with_iv[16..]; + let msg_mac = &e[(64+16+clen)..]; + + // Verify tag + let mut hmac = Hmac::new(Sha256::new(), &mkey); + hmac.input(cypher_iv); + let mut mac = H256::new(); + hmac.raw_result(&mut mac); + if &mac[..] != msg_mac { + return Err(CryptoError::InvalidMessage); + } + + let mut msg = vec![0u8; clen]; + aes::decrypt(ekey, &H128::new(), cypher_no_iv, &mut msg[..]); + Ok(msg) + } + fn kdf(secret: &Secret, s1: &[u8], dest: &mut [u8]) { use ::rcrypto::digest::Digest; use ::rcrypto::sha2::Sha256; @@ -252,10 +295,10 @@ mod tests { fn test_signature() { let pair = KeyPair::create().unwrap(); let message = H256::random(); - let signature = sign(pair.secret(), &message).unwrap(); + let signature = ec::sign(pair.secret(), &message).unwrap(); - assert!(verify(pair.public(), &signature, &message).unwrap()); - assert_eq!(recover(&signature, &message).unwrap(), *pair.public()); + assert!(ec::verify(pair.public(), &signature, &message).unwrap()); + assert_eq!(ec::recover(&signature, &message).unwrap(), *pair.public()); } #[test] diff --git a/src/network/connection.rs b/src/network/connection.rs index c5deddf58..db5cde10d 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -1,9 +1,22 @@ -use std::io::{self, Cursor, Read}; -use mio::*; +#![allow(dead_code)] //TODO: remove this after everything is done +use mio::{Token, EventSet, EventLoop, Timeout, PollOpt, TryRead, TryWrite}; use mio::tcp::*; use hash::*; +use sha3::*; use bytes::*; -use network::host::Host; +use rlp::*; +use std::io::{self, Cursor, Read}; +use network::host::{Host}; +use network::Error; +use network::handshake::Handshake; +use crypto; +use rcrypto::blockmodes::*; +use rcrypto::aessafe::*; +use rcrypto::symmetriccipher::*; +use rcrypto::buffer::*; +use tiny_keccak::Keccak; + +const ENCRYPTED_HEADER_LEN: usize = 32; pub struct Connection { pub token: Token, @@ -19,6 +32,189 @@ pub enum WriteStatus { Complete } +enum EncryptedConnectionState { + Header, + Payload, +} + +pub struct EncryptedConnection { + connection: Connection, + encoder: CtrMode, + decoder: CtrMode, + mac_encoder: EcbEncryptor>, + egress_mac: Keccak, + ingress_mac: Keccak, + read_state: EncryptedConnectionState, + idle_timeout: Option, + protocol_id: u16, + payload_len: u32, +} + +impl EncryptedConnection { + pub fn new(handshake: Handshake) -> Result { + let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public)); + let mut nonce_material = H512::new(); + if handshake.originated { + handshake.remote_nonce.copy_to(&mut nonce_material[0..32]); + handshake.nonce.copy_to(&mut nonce_material[32..64]); + } + else { + handshake.nonce.copy_to(&mut nonce_material[0..32]); + handshake.remote_nonce.copy_to(&mut nonce_material[32..64]); + } + let mut key_material = H512::new(); + shared.copy_to(&mut key_material[0..32]); + nonce_material.sha3_into(&mut key_material[32..64]); + key_material.sha3().copy_to(&mut key_material[32..64]); + + let iv = vec![0u8; 16]; + let encoder = CtrMode::new(AesSafe128Encryptor::new(&key_material[32..64]), iv); + let iv = vec![0u8; 16]; + let decoder = CtrMode::new(AesSafe128Encryptor::new(&key_material[32..64]), iv); + + key_material.sha3().copy_to(&mut key_material[32..64]); + let mac_encoder = EcbEncryptor::new(AesSafe128Encryptor::new(&key_material[32..64]), NoPadding); + + let mut egress_mac = Keccak::new_keccak256(); + let mut mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.remote_nonce; + egress_mac.update(&mac_material); + egress_mac.update(if handshake.originated { &handshake.auth_cipher } else { &handshake.ack_cipher }); + + let mut ingress_mac = Keccak::new_keccak256(); + mac_material = &(&mac_material ^ &handshake.remote_nonce) ^ &handshake.nonce; + ingress_mac.update(&mac_material); + ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher }); + + Ok(EncryptedConnection { + connection: handshake.connection, + encoder: encoder, + decoder: decoder, + mac_encoder: mac_encoder, + egress_mac: egress_mac, + ingress_mac: ingress_mac, + read_state: EncryptedConnectionState::Header, + idle_timeout: None, + protocol_id: 0, + payload_len: 0 + }) + } + + pub fn write_packet(&mut self, payload: &[u8]) -> Result<(), Error> { + let mut header = RlpStream::new(); + let len = payload.len() as usize; + header.append_raw(&[(len >> 16) as u8, (len >> 8) as u8, len as u8], 1); + header.append_raw(&[0xc2u8, 0x80u8, 0x80u8], 1); + //TODO: ger rid of vectors here + let mut header = header.out(); + let padding = (16 - (payload.len() % 16)) % 16; + header.resize(16, 0u8); + + let mut packet = vec![0u8; (32 + payload.len() + padding + 16)]; + self.encoder.encrypt(&mut RefReadBuffer::new(&header), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); + self.egress_mac.update(&packet[0..16]); + self.egress_mac.clone().finalize(&mut packet[16..32]); + self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding"); + if padding != 0 { + let pad = [08; 16]; + self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding"); + } + self.egress_mac.update(&packet[32..(32 + len + padding)]); + self.egress_mac.clone().finalize(&mut packet[(32 + len + padding)..]); + self.connection.send(&packet); + Ok(()) + } + + fn read_header(&mut self, header: &[u8]) -> Result<(), Error> { + if header.len() != ENCRYPTED_HEADER_LEN { + return Err(Error::Auth); + } + self.ingress_mac.update(header); + let mac = &header[16..]; + let mut expected = H128::new(); + self.ingress_mac.clone().finalize(&mut expected); + if mac != &expected[..] { + return Err(Error::Auth); + } + + let mut header_dec = H128::new(); + self.decoder.decrypt(&mut RefReadBuffer::new(&header[0..16]), &mut RefWriteBuffer::new(&mut header_dec), false).expect("Invalid length or padding"); + + let length = ((header[0] as u32) << 8 + header[1] as u32) << 8 + header[2] as u32; + let header_rlp = UntrustedRlp::new(&header[3..]); + let protocol_id = try!(u16::decode_untrusted(&try!(header_rlp.at(0)))); + + self.payload_len = length; + self.protocol_id = protocol_id; + self.read_state = EncryptedConnectionState::Payload; + + let padding = (16 - (length % 16)) % 16; + let full_length = length + padding + 16; + self.connection.expect(full_length as usize); + Ok(()) + } + + fn read_payload(&mut self, payload: &[u8]) -> Result { + let padding = (16 - (self.payload_len % 16)) % 16; + let full_length = (self.payload_len + padding + 16) as usize; + if payload.len() != full_length { + return Err(Error::Auth); + } + self.ingress_mac.update(&payload[0..payload.len() - 16]); + let mac = &payload[(payload.len() - 16)..]; + let mut expected = H128::new(); + self.ingress_mac.clone().finalize(&mut expected); + if mac != &expected[..] { + return Err(Error::Auth); + } + + let mut packet = vec![0u8; self.payload_len as usize]; + self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); + packet.resize(self.payload_len as usize, 0u8); + Ok(packet) + } + + pub fn readable(&mut self, event_loop: &mut EventLoop) -> Result, Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + try!(self.connection.reregister(event_loop)); + match self.read_state { + EncryptedConnectionState::Header => { + match try!(self.connection.readable()) { + Some(data) => { + try!(self.read_header(&data)); + }, + None => {} + }; + Ok(None) + }, + EncryptedConnectionState::Payload => { + match try!(self.connection.readable()) { + Some(data) => { + self.read_state = EncryptedConnectionState::Header; + self.connection.expect(ENCRYPTED_HEADER_LEN); + Ok(Some(try!(self.read_payload(&data)))) + }, + None => Ok(None) + } + } + } + } + + pub fn writable(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + try!(self.connection.writable()); + try!(self.connection.reregister(event_loop)); + Ok(()) + } + + pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { + self.connection.expect(ENCRYPTED_HEADER_LEN); + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); + try!(self.connection.register(event_loop)); + Ok(()) + } +} + impl Connection { pub fn new(token: Token, socket: TcpStream) -> Connection { Connection { @@ -35,11 +231,12 @@ impl Connection { if self.rec_size != self.rec_buf.len() { warn!(target:"net", "Unexpected connection read start"); } - unsafe { self.rec_buf.set_len(size) } + unsafe { self.rec_buf.set_len(0) } self.rec_size = size; } - pub fn readable(&mut self) -> io::Result> { + //TODO: return a slice + pub fn readable(&mut self) -> io::Result> { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { warn!(target:"net", "Unexpected connection read"); } @@ -47,7 +244,10 @@ impl Connection { // resolve "multiple applicable items in scope [E0034]" error let sock_ref = ::by_ref(&mut self.socket); match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { - Ok(Some(_)) if self.rec_buf.len() == self.rec_size => Ok(Some(&self.rec_buf[0..self.rec_size])), + Ok(Some(_)) if self.rec_buf.len() == self.rec_size => { + self.rec_size = 0; + Ok(Some(::std::mem::replace(&mut self.rec_buf, Bytes::new()))) + }, Ok(_) => Ok(None), Err(e) => Err(e), } @@ -107,3 +307,4 @@ impl Connection { } } + diff --git a/src/network/handshake.rs b/src/network/handshake.rs index ef1cf418b..186d6b8a7 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -1,6 +1,7 @@ use mio::*; use mio::tcp::*; use hash::*; +use bytes::Bytes; use crypto::*; use crypto; use network::connection::{Connection, WriteStatus}; @@ -14,8 +15,6 @@ enum HandshakeState { WritingAuth, ReadingAck, WritingAck, - WritingHello, - ReadingHello, StartSession, } @@ -23,34 +22,46 @@ pub struct Handshake { pub id: NodeId, pub connection: Connection, state: HandshakeState, + pub originated: bool, idle_timeout: Option, - ecdhe: KeyPair, - nonce: H256, - remote_public: Public, - remote_nonce: H256 + pub ecdhe: KeyPair, + pub nonce: H256, + pub remote_public: Public, + pub remote_nonce: H256, + pub auth_cipher: Bytes, + pub ack_cipher: Bytes } +const AUTH_PACKET_SIZE:usize = 307; +const ACK_PACKET_SIZE:usize = 210; + impl Handshake { pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { Ok(Handshake { id: id.clone(), connection: Connection::new(token, socket), + originated: false, state: HandshakeState::New, idle_timeout: None, ecdhe: try!(KeyPair::create()), nonce: nonce.clone(), remote_public: Public::new(), - remote_nonce: H256::new() + remote_nonce: H256::new(), + auth_cipher: Bytes::new(), + ack_cipher: Bytes::new(), }) } - pub fn start(&mut self, host: &HostInfo, originated: bool) { + pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), Error> { + self.originated = originated; if originated { - self.write_auth(host); + try!(self.write_auth(host)); } else { - self.read_auth(); + self.state = HandshakeState::ReadingAuth; + self.connection.expect(AUTH_PACKET_SIZE); }; + Ok(()) } pub fn done(&self) -> bool { @@ -58,88 +69,138 @@ impl Handshake { } pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { - self.idle_timeout.map(|t| event_loop.clear_timeout(t)); - Ok(()) - } - - pub fn writable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); match self.state { - HandshakeState::WritingAuth => { - match (try!(self.connection.writable())) { - WriteStatus::Complete => { try!(self.read_ack()); }, - _ => {} + HandshakeState::ReadingAuth => { + match try!(self.connection.readable()) { + Some(data) => { + try!(self.read_auth(host, &data)); + try!(self.write_ack()); + }, + None => {} }; - try!(self.connection.reregister(event_loop)); }, - HandshakeState::WritingAck => { - match (try!(self.connection.writable())) { - WriteStatus::Complete => { try!(self.read_hello()); }, - _ => {} - }; - try!(self.connection.reregister(event_loop)); - }, - HandshakeState::WritingHello => { - match (try!(self.connection.writable())) { - WriteStatus::Complete => { self.state = HandshakeState::StartSession; }, - _ => { try!(self.connection.reregister(event_loop)); } + HandshakeState::ReadingAck => { + match try!(self.connection.readable()) { + Some(data) => { + try!(self.read_ack(host, &data)); + self.state = HandshakeState::StartSession; + }, + None => {} }; }, _ => { panic!("Unexpected state") } } + try!(self.connection.reregister(event_loop)); + Ok(()) + } + + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + match self.state { + HandshakeState::WritingAuth => { + match try!(self.connection.writable()) { + WriteStatus::Complete => { + self.connection.expect(ACK_PACKET_SIZE); + self.state = HandshakeState::ReadingAck; + }, + _ => {} + }; + }, + HandshakeState::WritingAck => { + match try!(self.connection.writable()) { + WriteStatus::Complete => { + self.connection.expect(32); + self.state = HandshakeState::StartSession; + }, + _ => {} + }; + }, + _ => { panic!("Unexpected state") } + } + try!(self.connection.reregister(event_loop)); Ok(()) } pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); - self.connection.register(event_loop); + try!(self.connection.register(event_loop)); Ok(()) } - fn read_auth(&mut self) -> Result<(), Error> { - Ok(()) + fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), Error> { + trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); + assert!(data.len() == AUTH_PACKET_SIZE); + self.auth_cipher = data.to_vec(); + let auth = try!(ecies::decrypt(host.secret(), data)); + let (sig, rest) = auth.split_at(65); + let (hepubk, rest) = rest.split_at(32); + let (pubk, rest) = rest.split_at(64); + let (nonce, _) = rest.split_at(32); + self.remote_public.clone_from_slice(pubk); + self.remote_nonce.clone_from_slice(nonce); + let shared = try!(ecdh::agree(host.secret(), &self.remote_public)); + let signature = ec::Signature::from_slice(sig); + let spub = try!(ec::recover(&signature, &(&shared ^ &self.remote_nonce))); + if &spub.sha3()[..] != hepubk { + trace!(target:"net", "Handshake hash mismath with {:?}", self.connection.socket.peer_addr()); + return Err(Error::Auth); + }; + self.write_ack() } - fn read_ack(&mut self) -> Result<(), Error> { - Ok(()) - } - - fn read_hello(&mut self) -> Result<(), Error> { + fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), Error> { + trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); + assert!(data.len() == ACK_PACKET_SIZE); + self.ack_cipher = data.to_vec(); + let ack = try!(ecies::decrypt(host.secret(), data)); + let (pubk, nonce) = ack.split_at(65); + self.remote_public.clone_from_slice(pubk); + self.remote_nonce.clone_from_slice(nonce); Ok(()) } fn write_auth(&mut self, host: &HostInfo) -> Result<(), Error> { - trace!(target:"net", "Sending auth to {:?}", self.connection.socket.peer_addr()); + trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let len = data.len(); { data[len - 1] = 0x0; let (sig, rest) = data.split_at_mut(65); let (hepubk, rest) = rest.split_at_mut(32); - let (mut pubk, rest) = rest.split_at_mut(64); - let (nonce, rest) = rest.split_at_mut(32); + let (pubk, rest) = rest.split_at_mut(64); + let (nonce, _) = rest.split_at_mut(32); // E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0) let shared = try!(crypto::ecdh::agree(host.secret(), &self.id)); - let signature = try!(crypto::ec::sign(self.ecdhe.secret(), &(&shared ^ &self.nonce))).copy_to(sig); + try!(crypto::ec::sign(self.ecdhe.secret(), &(&shared ^ &self.nonce))).copy_to(sig); self.ecdhe.public().sha3_into(hepubk); - host.id().copy_to(&mut pubk); + host.id().copy_to(pubk); self.nonce.copy_to(nonce); } let message = try!(crypto::ecies::encrypt(&self.id, &data)); self.connection.send(&message[..]); + self.auth_cipher = message; self.state = HandshakeState::WritingAuth; Ok(()) } fn write_ack(&mut self) -> Result<(), Error> { + trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); + let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants + let len = data.len(); + { + data[len - 1] = 0x0; + let (epubk, rest) = data.split_at_mut(64); + let (nonce, _) = rest.split_at_mut(32); + self.ecdhe.public().copy_to(epubk); + self.nonce.copy_to(nonce); + } + let message = try!(crypto::ecies::encrypt(&self.id, &data)); + self.connection.send(&message[..]); + self.ack_cipher = message; + self.state = HandshakeState::WritingAck; Ok(()) } - - fn write_hello(&mut self) -> Result<(), Error> { - Ok(()) - } - - } diff --git a/src/network/host.rs b/src/network/host.rs index d436fae5f..cdb6dd132 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] //TODO: remove this after everything is done //TODO: remove all unwraps -use std::net::{SocketAddr, AddrParseError}; +use std::net::{SocketAddr, ToSocketAddrs}; use std::collections::{HashSet, HashMap, BTreeMap}; use std::hash::{Hash, Hasher}; use std::cell::{RefCell}; @@ -13,9 +13,9 @@ use mio::udp::*; use hash::*; use crypto::*; use time::Tm; -use error::EthcoreError; -use network::connection::Connection; use network::handshake::Handshake; +use network::session::Session; +use network::Error; const DEFAULT_PORT: u16 = 30303; @@ -65,32 +65,20 @@ impl NodeEndpoint { udp_port: address.port() } } - fn from_str(address: &str) -> Result { - let address = try!(SocketAddr::from_str(address)); - Ok(NodeEndpoint { - address: address, - udp_port: address.port() - }) + fn from_str(s: &str) -> Result { + println!("{:?}", s); + let address = s.to_socket_addrs().map(|mut i| i.next()); + match address { + Ok(Some(a)) => Ok(NodeEndpoint { + address: a, + udp_port: a.port() + }), + Ok(_) => Err(Error::AddressResolve(None)), + Err(e) => Err(Error::AddressResolve(Some(e))) + } } } -#[derive(Debug)] -pub enum AddressError { - AddrParseError(AddrParseError), - NodeIdParseError(EthcoreError) -} - -impl From for AddressError { - fn from(err: AddrParseError) -> AddressError { - AddressError::AddrParseError(err) - } -} -impl From for AddressError { - fn from(err: EthcoreError) -> AddressError { - AddressError::NodeIdParseError(err) - } -} - #[derive(PartialEq, Eq, Copy, Clone)] enum PeerType { Required, @@ -106,10 +94,10 @@ struct Node { } impl FromStr for Node { - type Err = AddressError; + type Err = Error; fn from_str(s: &str) -> Result { - let (id, endpoint) = if &s[..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { - (try!(NodeId::from_str(&s[8..128])), try!(NodeEndpoint::from_str(&s[137..]))) + let (id, endpoint) = if &s[0..8] == "enode://" && s.len() > 136 && &s[136..137] == "@" { + (try!(NodeId::from_str(&s[8..136])), try!(NodeEndpoint::from_str(&s[137..]))) } else { (NodeId::new(), try!(NodeEndpoint::from_str(s))) @@ -164,11 +152,6 @@ impl NodeBucket { } } -struct Peer { - id: NodeId, - connection: Connection, -} - struct FindNodePacket; impl FindNodePacket { @@ -190,8 +173,6 @@ const NODETABLE_MAINTAIN: usize = 5; const NODETABLE_DISCOVERY: usize = 6; const FIRST_CONNECTION: usize = 7; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; -const FIRST_HANDSHAKE: usize = FIRST_CONNECTION + MAX_CONNECTIONS; -const LAST_HANDSHAKE: usize = FIRST_HANDSHAKE + MAX_CONNECTIONS - 1; pub enum HostMessage { Shutdown @@ -217,13 +198,17 @@ impl HostInfo { } } +enum ConnectionEntry { + Handshake(Handshake), + Session(Session) +} + pub struct Host { info: HostInfo, sender: Sender, udp_socket: UdpSocket, listener: TcpListener, - peers: Slab, - connecting: Slab, + connections: Slab, discovery_round: u16, discovery_id: NodeId, discovery_nodes: HashSet, @@ -269,8 +254,7 @@ impl Host { sender: sender, udp_socket: udp_socket, listener: listener, - peers: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), - connecting: Slab::new_starting_at(Token(FIRST_HANDSHAKE), MAX_CONNECTIONS), + connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), discovery_round: 0, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), @@ -281,10 +265,10 @@ impl Host { host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@gav.ethdev.com:30300"); - host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02dd@gav.ethdev.com:30303"); - host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163@52.16.188.185:30303"); - host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e03@54.207.93.166:30303"); - host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254@92.51.165.126:30303"); + host.add_node("enode://e58d5e26b3b630496ec640f2530f3e7fa8a8c7dfe79d9e9c4aac80e3730132b869c852d3125204ab35bb1b1951f6f2d40996c1034fd8c5a69b383ee337f02ddc@gav.ethdev.com:30303"); + host.add_node("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@52.16.188.185:30303"); + host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); + host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); event_loop.run(&mut host).unwrap(); } @@ -299,7 +283,10 @@ impl Host { fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { warn!("Could not add node: {:?}", e); }, - Ok(n) => { self.nodes.insert(n.id.clone(), n); } + Ok(n) => { + self.node_buckets[Host::distance(self.info.id(), &n.id) as usize].nodes.push(n.id.clone()); + self.nodes.insert(n.id.clone(), n); + } } } @@ -343,13 +330,7 @@ impl Host { } fn distance(a: &NodeId, b: &NodeId) -> u32 { - //TODO: - //u256 d = sha3(_a) ^ sha3(_b); - let mut d: NodeId = NodeId::new(); - for i in 0..32 { - d[i] = a[i] ^ b[i]; - } - + let d = a.sha3() ^ b.sha3(); let mut ret:u32 = 0; for i in 0..32 { let mut v: u8 = d[i]; @@ -448,11 +429,11 @@ impl Host { } fn have_session(&self, id: &NodeId) -> bool { - self.peers.iter().any(|h| h.id.eq(&id)) + self.connections.iter().any(|e| match e { &ConnectionEntry::Session(ref s) => s.id.eq(&id), _ => false }) } fn connecting_to(&self, id: &NodeId) -> bool { - self.connecting.iter().any(|h| h.id.eq(&id)) + self.connections.iter().any(|e| match e { &ConnectionEntry::Handshake(ref h) => h.id.eq(&id), _ => false }) } fn connect_peers(&mut self, event_loop: &mut EventLoop) { @@ -529,10 +510,18 @@ impl Host { }; let nonce = self.info.next_nonce(); - match self.connecting.insert_with(|token| Handshake::new(token, id, socket, &nonce).expect("Can't create handshake")) { + match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { - self.connecting[token].register(event_loop).expect("Handshake token regisration failed"); - self.connecting[token].start(&self.info, true); + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.start(&self.info, true) + .and_then(|_| h.register(event_loop)) + .unwrap_or_else (|e| { + debug!(target: "net", "Handshake create error: {:?}", e); + }); + }, + _ => {} + } }, None => { warn!("Max connections reached") } } @@ -543,57 +532,86 @@ impl Host { warn!(target: "net", "accept"); } - fn handshake_writable(&mut self, token: Token, event_loop: &mut EventLoop) { - if !{ - let handshake = match self.connecting.get_mut(token) { - Some(h) => h, - None => { - warn!(target: "net", "Received event for unknown handshake"); - return; + fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + let mut kill = false; + let mut create_session = false; + { + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.writable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Handshake write error: {:?}", e); + kill = true; + }); + create_session = h.done(); + }, + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.writable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Session write error: {:?}", e); + kill = true; + }); + } + _ => { + warn!(target: "net", "Received event for unknown connection"); } }; - match handshake.writable(event_loop, &self.info) { - Err(e) => { - debug!(target: "net", "Handshake read error: {:?}", e); - false - }, - Ok(_) => true - } - } { - self.kill_handshake(token, event_loop); + } + if kill { + self.kill_connection(token, event_loop); + } + if create_session { + self.start_session(token, event_loop); } } - fn handshake_readable(&mut self, token: Token, event_loop: &mut EventLoop) { - if !{ - let handshake = match self.connecting.get_mut(token) { - Some(h) => h, - None => { - warn!(target: "net", "Received event for unknown handshake"); - return; + fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + let mut kill = false; + let mut create_session = false; + { + match self.connections.get_mut(token) { + Some(&mut ConnectionEntry::Handshake(ref mut h)) => { + h.readable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Handshake read error: {:?}", e); + kill = true; + }); + create_session = h.done(); + }, + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.readable(event_loop, &self.info).unwrap_or_else(|e| { + debug!(target: "net", "Session read error: {:?}", e); + kill = true; + }); + } + _ => { + warn!(target: "net", "Received event for unknown connection"); } }; - match handshake.writable(event_loop, &self.info) { - Err(e) => { - debug!(target: "net", "Handshake read error: {:?}", e); - false - }, - Ok(_) => true - } - } { - self.kill_handshake(token, event_loop); + } + if kill { + self.kill_connection(token, event_loop); + } + if create_session { + self.start_session(token, event_loop); } } - fn handshake_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { - self.kill_handshake(token, event_loop) - } - fn kill_handshake(&mut self, token: Token, _event_loop: &mut EventLoop) { - self.connecting.remove(token); + + fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { + self.connections.replace_with(token, |c| { + match c { + ConnectionEntry::Handshake(h) => Session::new(h, event_loop) + .map(|s| Some(ConnectionEntry::Session(s))) + .unwrap_or_else(|e| { + debug!(target: "net", "Session construction error: {:?}", e); + None + }), + _ => { panic!("No handshake to create a session from"); } + } + }).expect("Error updating slab with session"); } - fn read_connection(&mut self, _token: Token, _event_loop: &mut EventLoop) { + fn connection_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { + self.kill_connection(token, event_loop) } - - fn write_connection(&mut self, _token: Token, _event_loop: &mut EventLoop) { + fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop) { + self.connections.remove(token); } } @@ -606,16 +624,14 @@ impl Handler for Host { match token.as_usize() { TCP_ACCEPT => self.accept(event_loop), IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.read_connection(token, event_loop), - FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(token, event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_readable(token, event_loop), NODETABLE_RECEIVE => {}, _ => panic!("Received unknown readable token"), } } else if events.is_writable() { match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.write_connection(token, event_loop), - FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(token, event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_writable(token, event_loop), _ => panic!("Received unknown writable token"), } } @@ -624,7 +640,7 @@ impl Handler for Host { fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { match token.as_usize() { IDLE => self.maintain_network(event_loop), - FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_timeout(token, event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, _ => panic!("Received unknown timer token"), @@ -636,9 +652,11 @@ impl Handler for Host { #[cfg(test)] mod tests { use network::host::Host; + use env_logger; #[test] - #[ignore] + //#[ignore] fn net_connect() { + env_logger::init().unwrap(); let _ = Host::start(); } } diff --git a/src/network/mod.rs b/src/network/mod.rs index b8682dd12..d1f9940c4 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1,13 +1,18 @@ extern crate mio; -pub mod host; -pub mod connection; -pub mod handshake; - +mod host; +mod connection; +mod handshake; +mod session; #[derive(Debug)] pub enum Error { Crypto(::crypto::CryptoError), Io(::std::io::Error), + Auth, + BadProtocol, + AddressParse(::std::net::AddrParseError), + AddressResolve(Option<::std::io::Error>), + NodeIdParse(::error::EthcoreError), } impl From<::std::io::Error> for Error { @@ -21,4 +26,23 @@ impl From<::crypto::CryptoError> for Error { Error::Crypto(err) } } +impl From<::std::net::AddrParseError> for Error { + fn from(err: ::std::net::AddrParseError) -> Error { + Error::AddressParse(err) + } +} +impl From<::error::EthcoreError> for Error { + fn from(err: ::error::EthcoreError) -> Error { + Error::NodeIdParse(err) + } +} +impl From<::rlp::DecoderError> for Error { + fn from(_err: ::rlp::DecoderError) -> Error { + Error::Auth + } +} +pub fn start_host() +{ + let _ = host::Host::start(); +} diff --git a/src/network/session.rs b/src/network/session.rs new file mode 100644 index 000000000..71554b0ca --- /dev/null +++ b/src/network/session.rs @@ -0,0 +1,33 @@ +#![allow(dead_code)] //TODO: remove this after everything is done +//TODO: remove all unwraps +use mio::*; +use hash::*; +use network::connection::{EncryptedConnection}; +use network::handshake::Handshake; +use network::Error; +use network::host::*; + +pub struct Session { + pub id: NodeId, + connection: EncryptedConnection, +} + +impl Session { + pub fn new(h: Handshake, event_loop: &mut EventLoop) -> Result { + let id = h.id.clone(); + let mut connection = try!(EncryptedConnection::new(h)); + try!(connection.register(event_loop)); + Ok(Session { + id: id, + connection: connection, + }) + } + pub fn readable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { + try!(self.connection.readable(event_loop)); + Ok(()) + } + pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { + self.connection.writable(event_loop) + } +} + diff --git a/src/sha3.rs b/src/sha3.rs index a8b5c7c44..8ceedb15f 100644 --- a/src/sha3.rs +++ b/src/sha3.rs @@ -19,7 +19,9 @@ impl Hashable for T where T: BytesConvertable { } } fn sha3_into(&self, dest: &mut [u8]) { - keccak_256(self.bytes(), dest); + let mut keccak = Keccak::new_keccak256(); + keccak.update(self.bytes()); + keccak.finalize(dest); } }