From 4873c33fefa8ec7f116e2955ff9273f775e536b7 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 30 Nov 2015 16:38:55 +0100 Subject: [PATCH] Encryption primitives; Continue work on handshake --- Cargo.toml | 2 +- src/crypto.rs | 182 +++++++++++++++++++++++++------- src/hash.rs | 34 +++++- src/lib.rs | 2 +- src/network/connection.rs | 109 ++++++++++++++++++++ src/network/handshake.rs | 145 ++++++++++++++++++++++++++ src/network/host.rs | 211 ++++++++++++++++++++------------------ src/network/mod.rs | 22 ++++ src/sha3.rs | 4 + 9 files changed, 571 insertions(+), 140 deletions(-) create mode 100644 src/network/connection.rs create mode 100644 src/network/handshake.rs diff --git a/Cargo.toml b/Cargo.toml index 51e9f4470..170ac2990 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,5 +18,5 @@ tiny-keccak = "0.3" rocksdb = "0.2.1" num = "0.1" lazy_static = "0.1.*" -secp256k1 = "0.5.1" +eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" } rust-crypto = "0.2.34" diff --git a/src/crypto.rs b/src/crypto.rs index 0d1ae9c05..6843a204b 100644 --- a/src/crypto.rs +++ b/src/crypto.rs @@ -5,7 +5,8 @@ use rand::os::OsRng; pub type Secret=H256; pub type Public=H512; -pub type Signature=H520; + +pub use ::sha3::Hashable; #[derive(Debug)] pub enum CryptoError { @@ -94,47 +95,152 @@ impl KeyPair { } } -/// Recovers Public key from signed message hash. -pub fn recover(signature: &Signature, message: &H256) -> Result { - use secp256k1::*; - let context = Secp256k1::new(); - let rsig = try!(RecoverableSignature::from_compact(&context, &signature[0..64], try!(RecoveryId::from_i32(signature[64] as i32)))); - let publ = try!(context.recover(&try!(Message::from_slice(&message)), &rsig)); - let serialized = publ.serialize_vec(&context, false); - let p: Public = Public::from_slice(&serialized[1..65]); - Ok(p) -} -/// Returns siganture of message hash. -pub fn sign(secret: &Secret, message: &H256) -> Result { - use secp256k1::*; - let context = Secp256k1::new(); - let sec: &key::SecretKey = unsafe { ::std::mem::transmute(secret) }; - let s = try!(context.sign_recoverable(&try!(Message::from_slice(&message)), sec)); - let (rec_id, data) = s.serialize_compact(&context); - let mut signature: ::crypto::Signature = unsafe { ::std::mem::uninitialized() }; - signature.clone_from_slice(&data); - signature[64] = rec_id.to_i32() as u8; - Ok(signature) -} -/// Verify signature. -pub fn verify(public: &Public, signature: &Signature, message: &H256) -> Result { - use secp256k1::*; - let context = Secp256k1::new(); - let rsig = try!(RecoverableSignature::from_compact(&context, &signature[0..64], try!(RecoveryId::from_i32(signature[64] as i32)))); - let sig = rsig.to_standard(&context); +pub mod ec { + use hash::*; + use crypto::*; - let mut pdata: [u8; 65] = [4u8; 65]; - let ptr = pdata[1..].as_mut_ptr(); - let src = public.as_ptr(); - unsafe { ::std::ptr::copy_nonoverlapping(src, ptr, 64) }; - let publ = try!(key::PublicKey::from_slice(&context, &pdata)); - match context.verify(&try!(Message::from_slice(&message)), &sig, &publ) { - Ok(_) => Ok(true), - Err(Error::IncorrectSignature) => Ok(false), - Err(x) => Err(>::from(x)) + pub type Signature = H520; + /// Recovers Public key from signed message hash. + pub fn recover(signature: &Signature, message: &H256) -> Result { + use secp256k1::*; + let context = Secp256k1::new(); + let rsig = try!(RecoverableSignature::from_compact(&context, &signature[0..64], try!(RecoveryId::from_i32(signature[64] as i32)))); + let publ = try!(context.recover(&try!(Message::from_slice(&message)), &rsig)); + let serialized = publ.serialize_vec(&context, false); + let p: Public = Public::from_slice(&serialized[1..65]); + Ok(p) + } + /// Returns siganture of message hash. + pub fn sign(secret: &Secret, message: &H256) -> Result { + use secp256k1::*; + let context = Secp256k1::new(); + let sec: &key::SecretKey = unsafe { ::std::mem::transmute(secret) }; + let s = try!(context.sign_recoverable(&try!(Message::from_slice(&message)), sec)); + let (rec_id, data) = s.serialize_compact(&context); + let mut signature: ec::Signature = unsafe { ::std::mem::uninitialized() }; + signature.clone_from_slice(&data); + signature[64] = rec_id.to_i32() as u8; + Ok(signature) + } + /// Verify signature. + pub fn verify(public: &Public, signature: &Signature, message: &H256) -> Result { + use secp256k1::*; + let context = Secp256k1::new(); + let rsig = try!(RecoverableSignature::from_compact(&context, &signature[0..64], try!(RecoveryId::from_i32(signature[64] as i32)))); + let sig = rsig.to_standard(&context); + + let mut pdata: [u8; 65] = [4u8; 65]; + let ptr = pdata[1..].as_mut_ptr(); + let src = public.as_ptr(); + unsafe { ::std::ptr::copy_nonoverlapping(src, ptr, 64) }; + let publ = try!(key::PublicKey::from_slice(&context, &pdata)); + match context.verify(&try!(Message::from_slice(&message)), &sig, &publ) { + Ok(_) => Ok(true), + Err(Error::IncorrectSignature) => Ok(false), + Err(x) => Err(>::from(x)) + } } } +pub mod ecdh { + use crypto::*; + + pub fn agree(secret: &Secret, public: &Public, ) -> Result { + use secp256k1::*; + let context = Secp256k1::new(); + let mut pdata: [u8; 65] = [4u8; 65]; + let ptr = pdata[1..].as_mut_ptr(); + let src = public.as_ptr(); + unsafe { ::std::ptr::copy_nonoverlapping(src, ptr, 64) }; + let publ = try!(key::PublicKey::from_slice(&context, &pdata)); + let sec: &key::SecretKey = unsafe { ::std::mem::transmute(secret) }; + let shared = ecdh::SharedSecret::new_raw(&context, &publ, &sec); + let s: Secret = unsafe { ::std::mem::transmute(shared) }; + Ok(s) + } +} + +pub mod ecies { + use hash::*; + use bytes::*; + use crypto::*; + + pub fn encrypt(public: &Public, plain: &[u8]) -> Result { + use ::rcrypto::digest::Digest; + use ::rcrypto::sha2::Sha256; + use ::rcrypto::hmac::Hmac; + use ::rcrypto::mac::Mac; + let r = try!(KeyPair::create()); + let z = try!(ecdh::agree(r.secret(), public)); + let mut key = [0u8; 32]; + let mut mkey = [0u8; 32]; + kdf(&z, &[0u8; 0], &mut key); + let mut hasher = Sha256::new(); + let mkey_material = &key[16..32]; + hasher.input(mkey_material); + hasher.result(&mut mkey); + let ekey = &key[0..16]; + + let mut msg = vec![0u8; (1 + 64 + 16 + plain.len() + 32)]; + msg[0] = 0x04u8; + { + let msgd = &mut msg[1..]; + r.public().copy_to(&mut msgd[0..64]); + { + let cipher = &mut msgd[(64 + 16)..(64 + 16 + plain.len())]; + aes::encrypt(ekey, &H128::new(), plain, cipher); + } + let mut hmac = Hmac::new(Sha256::new(), &mkey); + { + let cipher_iv = &msgd[64..(64 + 16 + plain.len())]; + hmac.input(cipher_iv); + } + hmac.raw_result(&mut msgd[(64 + 16 + plain.len())..]); + } + Ok(msg) + } + + fn kdf(secret: &Secret, s1: &[u8], dest: &mut [u8]) { + use ::rcrypto::digest::Digest; + use ::rcrypto::sha2::Sha256; + let mut hasher = Sha256::new(); + // SEC/ISO/Shoup specify counter size SHOULD be equivalent + // to size of hash output, however, it also notes that + // the 4 bytes is okay. NIST specifies 4 bytes. + let mut ctr = 1u32; + let mut written = 0usize; + while written < dest.len() { + let ctrs = [(ctr >> 24) as u8, (ctr >> 16) as u8, (ctr >> 8) as u8, ctr as u8]; + hasher.input(&ctrs); + hasher.input(secret); + hasher.input(s1); + hasher.result(&mut dest[written..(written + 32)]); + hasher.reset(); + written += 32; + ctr += 1; + } + } +} + +pub mod aes { + use hash::*; + use ::rcrypto::blockmodes::*; + use ::rcrypto::aessafe::*; + use ::rcrypto::symmetriccipher::*; + use ::rcrypto::buffer::*; + + pub fn encrypt(k: &[u8], iv: &H128, plain: &[u8], dest: &mut [u8]) { + let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv[..].to_vec()); + encryptor.encrypt(&mut RefReadBuffer::new(plain), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); + } + + pub fn decrypt(k: &[u8], iv: &H128, encrypted: &[u8], dest: &mut [u8]) { + let mut encryptor = CtrMode::new(AesSafe128Encryptor::new(k), iv[..].to_vec()); + encryptor.decrypt(&mut RefReadBuffer::new(encrypted), &mut RefWriteBuffer::new(dest), true).expect("Invalid length or padding"); + } +} + + #[cfg(test)] mod tests { diff --git a/src/hash.rs b/src/hash.rs index ad918ef3d..4f27ae566 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::fmt; use std::ops; use std::hash::{Hash, Hasher}; -use std::ops::{Index, IndexMut, Deref, DerefMut, BitOr, BitAnd}; +use std::ops::{Index, IndexMut, Deref, DerefMut, BitOr, BitAnd, BitXor}; use rustc_serialize::hex::*; use error::EthcoreError; use rand::Rng; @@ -19,6 +19,7 @@ pub trait FixedHash: Sized + BytesConvertable { fn mut_bytes(&mut self) -> &mut [u8]; fn from_slice(src: &[u8]) -> Self; fn clone_from_slice(&mut self, src: &[u8]) -> usize; + fn copy_to(&self, dest: &mut [u8]); fn shift_bloom<'a, T>(&'a mut self, b: &T) -> &'a mut Self where T: FixedHash; fn bloom_part(&self, m: usize) -> T where T: FixedHash; fn contains_bloom(&self, b: &T) -> bool where T: FixedHash; @@ -95,6 +96,13 @@ macro_rules! impl_hash { r } + fn copy_to(&self, dest: &mut[u8]) { + unsafe { + let min = ::std::cmp::min($size, dest.len()); + ::std::ptr::copy(self.0.as_ptr(), dest.as_mut_ptr(), min); + } + } + fn shift_bloom<'a, T>(&'a mut self, b: &T) -> &'a mut Self where T: FixedHash { let bp: Self = b.bloom_part($size); let new_self = &bp | self; @@ -299,6 +307,30 @@ macro_rules! impl_hash { } } + /// BitXor on references + impl <'a> BitXor for &'a $from { + type Output = $from; + + fn bitxor(self, rhs: Self) -> Self::Output { + unsafe { + use std::mem; + let mut ret: $from = mem::uninitialized(); + for i in 0..$size { + ret.0[i] = self.0[i] ^ rhs.0[i]; + } + ret + } + } + } + + /// Moving BitXor + impl BitXor for $from { + type Output = $from; + + fn bitxor(self, rhs: Self) -> Self::Output { + &self ^ &rhs + } + } impl $from { pub fn hex(&self) -> String { format!("{}", self) diff --git a/src/lib.rs b/src/lib.rs index 27c927b2c..b39558d1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ pub mod math; pub mod chainfilter; pub mod crypto; -//pub mod network; +pub mod network; // reexports pub use std::str::FromStr; diff --git a/src/network/connection.rs b/src/network/connection.rs new file mode 100644 index 000000000..c5deddf58 --- /dev/null +++ b/src/network/connection.rs @@ -0,0 +1,109 @@ +use std::io::{self, Cursor, Read}; +use mio::*; +use mio::tcp::*; +use hash::*; +use bytes::*; +use network::host::Host; + +pub struct Connection { + pub token: Token, + pub socket: TcpStream, + rec_buf: Bytes, + rec_size: usize, + send_buf: Cursor, + interest: EventSet, +} + +pub enum WriteStatus { + Ongoing, + Complete +} + +impl Connection { + pub fn new(token: Token, socket: TcpStream) -> Connection { + Connection { + token: token, + socket: socket, + send_buf: Cursor::new(Bytes::new()), + rec_buf: Bytes::new(), + rec_size: 0, + interest: EventSet::hup(), + } + } + + pub fn expect(&mut self, size: usize) { + if self.rec_size != self.rec_buf.len() { + warn!(target:"net", "Unexpected connection read start"); + } + unsafe { self.rec_buf.set_len(size) } + self.rec_size = size; + } + + pub fn readable(&mut self) -> io::Result> { + if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { + warn!(target:"net", "Unexpected connection read"); + } + let max = self.rec_size - self.rec_buf.len(); + // resolve "multiple applicable items in scope [E0034]" error + let sock_ref = ::by_ref(&mut self.socket); + match sock_ref.take(max as u64).try_read_buf(&mut self.rec_buf) { + Ok(Some(_)) if self.rec_buf.len() == self.rec_size => Ok(Some(&self.rec_buf[0..self.rec_size])), + Ok(_) => Ok(None), + Err(e) => Err(e), + } + } + + pub fn send(&mut self, data: &[u8]) { //TODO: take ownership version + let send_size = self.send_buf.get_ref().len(); + if send_size != 0 || self.send_buf.position() as usize >= send_size { + warn!(target:"net", "Unexpected connection send start"); + } + if self.send_buf.get_ref().capacity() < data.len() { + let capacity = self.send_buf.get_ref().capacity(); + self.send_buf.get_mut().reserve(data.len() - capacity); + } + unsafe { self.send_buf.get_mut().set_len(data.len()) } + unsafe { ::std::ptr::copy_nonoverlapping(data.as_ptr(), self.send_buf.get_mut()[..].as_mut_ptr(), data.len()) }; + if !self.interest.is_writable() { + self.interest.insert(EventSet::writable()); + } + } + + pub fn writable(&mut self) -> io::Result { + let send_size = self.send_buf.get_ref().len(); + if (self.send_buf.position() as usize) >= send_size { + warn!(target:"net", "Unexpected connection data"); + return Ok(WriteStatus::Complete) + } + match self.socket.try_write_buf(&mut self.send_buf) { + Ok(_) if (self.send_buf.position() as usize) < send_size => { + self.interest.insert(EventSet::writable()); + Ok(WriteStatus::Ongoing) + }, + Ok(_) if (self.send_buf.position() as usize) == send_size => { + self.interest.remove(EventSet::writable()); + Ok(WriteStatus::Complete) + }, + Ok(_) => { panic!("Wrote past buffer");}, + Err(e) => Err(e) + } + } + + pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection register; token={:?}", self.token); + self.interest.insert(EventSet::readable()); + event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { + error!("Failed to reregister {:?}, {:?}", self.token, e); + Err(e) + }) + } + + pub fn reregister(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { + trace!(target: "net", "connection reregister; token={:?}", self.token); + event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { + error!("Failed to reregister {:?}, {:?}", self.token, e); + Err(e) + }) + } +} + diff --git a/src/network/handshake.rs b/src/network/handshake.rs new file mode 100644 index 000000000..ef1cf418b --- /dev/null +++ b/src/network/handshake.rs @@ -0,0 +1,145 @@ +use mio::*; +use mio::tcp::*; +use hash::*; +use crypto::*; +use crypto; +use network::connection::{Connection, WriteStatus}; +use network::host::{NodeId, Host, HostInfo}; +use network::Error; + +#[derive(PartialEq, Eq)] +enum HandshakeState { + New, + ReadingAuth, + WritingAuth, + ReadingAck, + WritingAck, + WritingHello, + ReadingHello, + StartSession, +} + +pub struct Handshake { + pub id: NodeId, + pub connection: Connection, + state: HandshakeState, + idle_timeout: Option, + ecdhe: KeyPair, + nonce: H256, + remote_public: Public, + remote_nonce: H256 +} + +impl Handshake { + pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result { + Ok(Handshake { + id: id.clone(), + connection: Connection::new(token, socket), + state: HandshakeState::New, + idle_timeout: None, + ecdhe: try!(KeyPair::create()), + nonce: nonce.clone(), + remote_public: Public::new(), + remote_nonce: H256::new() + }) + } + + pub fn start(&mut self, host: &HostInfo, originated: bool) { + if originated { + self.write_auth(host); + } + else { + self.read_auth(); + }; + } + + pub fn done(&self) -> bool { + self.state == HandshakeState::StartSession + } + + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + Ok(()) + } + + pub fn writable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + match self.state { + HandshakeState::WritingAuth => { + match (try!(self.connection.writable())) { + WriteStatus::Complete => { try!(self.read_ack()); }, + _ => {} + }; + try!(self.connection.reregister(event_loop)); + }, + HandshakeState::WritingAck => { + match (try!(self.connection.writable())) { + WriteStatus::Complete => { try!(self.read_hello()); }, + _ => {} + }; + try!(self.connection.reregister(event_loop)); + }, + HandshakeState::WritingHello => { + match (try!(self.connection.writable())) { + WriteStatus::Complete => { self.state = HandshakeState::StartSession; }, + _ => { try!(self.connection.reregister(event_loop)); } + }; + }, + _ => { panic!("Unexpected state") } + } + Ok(()) + } + + pub fn register(&mut self, event_loop: &mut EventLoop) -> Result<(), Error> { + self.idle_timeout.map(|t| event_loop.clear_timeout(t)); + self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); + self.connection.register(event_loop); + Ok(()) + } + + fn read_auth(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn read_ack(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn read_hello(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn write_auth(&mut self, host: &HostInfo) -> Result<(), Error> { + trace!(target:"net", "Sending auth to {:?}", self.connection.socket.peer_addr()); + let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants + let len = data.len(); + { + data[len - 1] = 0x0; + let (sig, rest) = data.split_at_mut(65); + let (hepubk, rest) = rest.split_at_mut(32); + let (mut pubk, rest) = rest.split_at_mut(64); + let (nonce, rest) = rest.split_at_mut(32); + + // E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0) + let shared = try!(crypto::ecdh::agree(host.secret(), &self.id)); + let signature = try!(crypto::ec::sign(self.ecdhe.secret(), &(&shared ^ &self.nonce))).copy_to(sig); + self.ecdhe.public().sha3_into(hepubk); + host.id().copy_to(&mut pubk); + self.nonce.copy_to(nonce); + } + let message = try!(crypto::ecies::encrypt(&self.id, &data)); + self.connection.send(&message[..]); + self.state = HandshakeState::WritingAuth; + Ok(()) + } + + fn write_ack(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn write_hello(&mut self) -> Result<(), Error> { + Ok(()) + } + + +} diff --git a/src/network/host.rs b/src/network/host.rs index c945b0e57..d436fae5f 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -11,9 +11,11 @@ use mio::util::{Slab}; use mio::tcp::*; use mio::udp::*; use hash::*; -use bytes::*; +use crypto::*; use time::Tm; use error::EthcoreError; +use network::connection::Connection; +use network::handshake::Handshake; const DEFAULT_PORT: u16 = 30303; @@ -27,9 +29,7 @@ const IDEAL_PEERS:u32 = 10; const BUCKET_SIZE: u32 = 16; ///< Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const ALPHA: usize = 3; ///< Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. -type NodeId = H512; -type PublicKey = H512; -type SecretKey = H256; +pub type NodeId = H512; #[derive(Debug)] struct NetworkConfiguration { @@ -47,7 +47,7 @@ impl NetworkConfiguration { public_address: SocketAddr::from_str("0.0.0.0:30303").unwrap(), no_nat: false, no_discovery: false, - pin: false + pin: false, } } } @@ -164,45 +164,6 @@ impl NodeBucket { } } -struct Connection { - socket: TcpStream, - send_queue: Vec, -} - -impl Connection { - fn new(socket: TcpStream) -> Connection { - Connection { - socket: socket, - send_queue: Vec::new(), - } - } -} - -#[derive(PartialEq, Eq)] -enum HandshakeState { - New, - AckAuth, - WriteHello, - ReadHello, - StartSession, -} - -struct Handshake { - id: NodeId, - connection: Connection, - state: HandshakeState, -} - -impl Handshake { - fn new(id: NodeId, socket: TcpStream) -> Handshake { - Handshake { - id: id, - connection: Connection::new(socket), - state: HandshakeState::New - } - } -} - struct Peer { id: NodeId, connection: Connection, @@ -214,7 +175,7 @@ impl FindNodePacket { fn new(_endpoint: &NodeEndpoint, _id: &NodeId) -> FindNodePacket { FindNodePacket } - fn sign(&mut self, _secret: &SecretKey) { + fn sign(&mut self, _secret: &Secret) { } fn send(& self, _socket: &mut UdpSocket) { @@ -236,11 +197,29 @@ pub enum HostMessage { Shutdown } -pub struct Host { - secret: SecretKey, - node: Node, - sender: Sender, +pub struct HostInfo { + keys: KeyPair, config: NetworkConfiguration, + nonce: H256 +} + +impl HostInfo { + pub fn id(&self) -> &NodeId { + self.keys.public() + } + + pub fn secret(&self) -> &Secret { + self.keys.secret() + } + pub fn next_nonce(&mut self) -> H256 { + self.nonce = self.nonce.sha3(); + return self.nonce.clone(); + } +} + +pub struct Host { + info: HostInfo, + sender: Sender, udp_socket: UdpSocket, listener: TcpListener, peers: Slab, @@ -282,9 +261,11 @@ impl Host { event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); let mut host = Host { - secret: SecretKey::new(), - node: Node::new(NodeId::new(), config.public_address.clone(), PeerType::Required), - config: config, + info: HostInfo { + keys: KeyPair::create().unwrap(), + config: config, + nonce: H256::random() + }, sender: sender, udp_socket: udp_socket, listener: listener, @@ -338,7 +319,7 @@ impl Host { } let mut tried_count = 0; { - let nearest = Host::nearest_node_entries(&self.node.id, &self.discovery_id, &self.node_buckets).into_iter(); + let nearest = Host::nearest_node_entries(&self.info.id(), &self.discovery_id, &self.node_buckets).into_iter(); let nodes = RefCell::new(&mut self.discovery_nodes); let nearest = nearest.filter(|x| nodes.borrow().contains(&x)).take(ALPHA); for r in nearest { @@ -380,14 +361,14 @@ impl Host { ret } - fn nearest_node_entries<'a>(source: &NodeId, target: &NodeId, buckets: &'a Vec) -> Vec<&'a NodeId> + fn nearest_node_entries<'b>(source: &NodeId, target: &NodeId, buckets: &'b Vec) -> Vec<&'b NodeId> { // send ALPHA FindNode packets to nodes we know, closest to target const LAST_BIN: u32 = NODE_BINS - 1; let mut head = Host::distance(source, target); let mut tail = if head == 0 { LAST_BIN } else { (head - 1) % NODE_BINS }; - let mut found: BTreeMap> = BTreeMap::new(); + let mut found: BTreeMap> = BTreeMap::new(); let mut count = 0; // if d is 0, then we roll look forward, if last, we reverse, else, spread from d @@ -463,7 +444,6 @@ impl Host { } fn maintain_network(&mut self, event_loop: &mut EventLoop) { - self.keep_alive(); self.connect_peers(event_loop); } @@ -491,7 +471,7 @@ impl Host { if connected && required { req_conn += 1; } - else if !connected && (!self.config.pin || required) { + else if !connected && (!self.info.config.pin || required) { to_connect.push(n); } } @@ -505,7 +485,7 @@ impl Host { } } - if !self.config.pin + if !self.info.config.pin { let pending_count = 0; //TODO: let peer_count = 0; @@ -532,56 +512,88 @@ impl Host { warn!("Aborted connect. Node already connecting."); return; } - let node = self.nodes.get_mut(id).unwrap(); - node.last_attempted = Some(::time::now()); - - - //blog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id(); - let socket = match TcpStream::connect(&node.endpoint.address) { - Ok(socket) => socket, - Err(_) => { - warn!("Cannot connect to node"); - return; + + let socket = { + let node = self.nodes.get_mut(id).unwrap(); + node.last_attempted = Some(::time::now()); + + + //blog(NetConnect) << "Attempting connection to node" << _p->id << "@" << ep << "from" << id(); + match TcpStream::connect(&node.endpoint.address) { + Ok(socket) => socket, + Err(_) => { + warn!("Cannot connect to node"); + return; + } } }; - let handshake = Handshake::new(id.clone(), socket); - match self.connecting.insert(handshake) { - Ok(token) => event_loop.register_opt(&self.connecting[token].connection.socket, token, EventSet::all(), PollOpt::edge()).unwrap(), - Err(_) => warn!("Max connections reached") - }; + + let nonce = self.info.next_nonce(); + match self.connecting.insert_with(|token| Handshake::new(token, id, socket, &nonce).expect("Can't create handshake")) { + Some(token) => { + self.connecting[token].register(event_loop).expect("Handshake token regisration failed"); + self.connecting[token].start(&self.info, true); + }, + None => { warn!("Max connections reached") } + } } - fn keep_alive(&mut self) { - } - - fn accept(&mut self, _event_loop: &mut EventLoop) { - warn!(target "net", "accept"); + warn!(target: "net", "accept"); } - fn start_handshake(&mut self, token: Token, _event_loop: &mut EventLoop) { - let handshake = match self.handshakes.get(&token) { - Some(h) => h, - None => { - warn!(target "net", "Received event for unknown handshake"); - return; + fn handshake_writable(&mut self, token: Token, event_loop: &mut EventLoop) { + if !{ + let handshake = match self.connecting.get_mut(token) { + Some(h) => h, + None => { + warn!(target: "net", "Received event for unknown handshake"); + return; + } + }; + match handshake.writable(event_loop, &self.info) { + Err(e) => { + debug!(target: "net", "Handshake read error: {:?}", e); + false + }, + Ok(_) => true } - }; - - - - + } { + self.kill_handshake(token, event_loop); + } + } + fn handshake_readable(&mut self, token: Token, event_loop: &mut EventLoop) { + if !{ + let handshake = match self.connecting.get_mut(token) { + Some(h) => h, + None => { + warn!(target: "net", "Received event for unknown handshake"); + return; + } + }; + match handshake.writable(event_loop, &self.info) { + Err(e) => { + debug!(target: "net", "Handshake read error: {:?}", e); + false + }, + Ok(_) => true + } + } { + self.kill_handshake(token, event_loop); + } + } + fn handshake_timeout(&mut self, token: Token, event_loop: &mut EventLoop) { + self.kill_handshake(token, event_loop) + } + fn kill_handshake(&mut self, token: Token, _event_loop: &mut EventLoop) { + self.connecting.remove(token); } - fn read_handshake(&mut self, _event_loop: &mut EventLoop) { - warn!(target "net", "accept"); + fn read_connection(&mut self, _token: Token, _event_loop: &mut EventLoop) { } - fn read_connection(&mut self, _event_loop: &mut EventLoop) { - } - - fn write_connection(&mut self, _event_loop: &mut EventLoop) { + fn write_connection(&mut self, _token: Token, _event_loop: &mut EventLoop) { } } @@ -594,16 +606,16 @@ impl Handler for Host { match token.as_usize() { TCP_ACCEPT => self.accept(event_loop), IDLE => self.maintain_network(event_loop), - FIRST_CONNECTION ... LAST_CONNECTION => self.read_connection(event_loop), - FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.read_handshake(event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.read_connection(token, event_loop), + FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(token, event_loop), NODETABLE_RECEIVE => {}, _ => panic!("Received unknown readable token"), } } else if events.is_writable() { match token.as_usize() { - FIRST_CONNECTION ... LAST_CONNECTION => self.write_connection(event_loop), - FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.start_handshake(event_loop), + FIRST_CONNECTION ... LAST_CONNECTION => self.write_connection(token, event_loop), + FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(token, event_loop), _ => panic!("Received unknown writable token"), } } @@ -612,6 +624,7 @@ impl Handler for Host { fn timeout(&mut self, event_loop: &mut EventLoop, token: Token) { match token.as_usize() { IDLE => self.maintain_network(event_loop), + FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_timeout(token, event_loop), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, _ => panic!("Received unknown timer token"), diff --git a/src/network/mod.rs b/src/network/mod.rs index 917d79464..b8682dd12 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1,2 +1,24 @@ extern crate mio; pub mod host; +pub mod connection; +pub mod handshake; + + +#[derive(Debug)] +pub enum Error { + Crypto(::crypto::CryptoError), + Io(::std::io::Error), +} + +impl From<::std::io::Error> for Error { + fn from(err: ::std::io::Error) -> Error { + Error::Io(err) + } +} + +impl From<::crypto::CryptoError> for Error { + fn from(err: ::crypto::CryptoError) -> Error { + Error::Crypto(err) + } +} + diff --git a/src/sha3.rs b/src/sha3.rs index ee328913c..c466f8915 100644 --- a/src/sha3.rs +++ b/src/sha3.rs @@ -5,6 +5,7 @@ use hash::{FixedHash, H256}; pub trait Hashable { fn sha3(&self) -> H256; + fn sha3_into(&self, dest: &mut [u8]); } impl Hashable for T where T: BytesConvertable { @@ -15,6 +16,9 @@ impl Hashable for T where T: BytesConvertable { ret } } + fn sha3_into(&self, dest: &mut [u8]) { + keccak_256(self.bytes(), dest); + } } #[test]