2016-12-11 19:31:31 +01:00
// Copyright 2015, 2016 Parity Technologies (UK) Ltd.
2016-02-05 13:40:41 +01:00
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
2016-01-24 18:53:54 +01:00
use std ::sync ::Arc ;
2015-12-02 20:11:13 +01:00
use std ::collections ::VecDeque ;
2016-02-15 19:54:27 +01:00
use std ::net ::SocketAddr ;
2016-06-13 18:55:24 +02:00
use std ::sync ::atomic ::{ AtomicBool , Ordering as AtomicOrdering } ;
2016-10-30 09:56:34 +01:00
use mio ::{ Token , Ready , PollOpt } ;
use mio ::deprecated ::{ Handler , EventLoop , TryRead , TryWrite } ;
2015-11-30 16:38:55 +01:00
use mio ::tcp ::* ;
2016-08-05 10:32:04 +02:00
use util ::hash ::* ;
use util ::sha3 ::* ;
use util ::bytes ::* ;
2016-09-01 14:49:12 +02:00
use rlp ::* ;
2016-02-03 01:55:18 +01:00
use std ::io ::{ self , Cursor , Read , Write } ;
2016-01-10 12:53:55 +01:00
use error ::* ;
2016-01-21 16:48:37 +01:00
use io ::{ IoContext , StreamToken } ;
2016-08-05 10:32:04 +02:00
use handshake ::Handshake ;
use stats ::NetworkStats ;
2015-12-02 12:07:46 +01:00
use rcrypto ::blockmodes ::* ;
use rcrypto ::aessafe ::* ;
use rcrypto ::symmetriccipher ::* ;
use rcrypto ::buffer ::* ;
use tiny_keccak ::Keccak ;
2016-10-30 09:56:34 +01:00
use bytes ::{ Buf , MutBuf } ;
2016-08-24 18:35:21 +02:00
use crypto ;
2015-12-02 12:07:46 +01:00
const ENCRYPTED_HEADER_LEN : usize = 32 ;
2016-01-21 16:48:37 +01:00
const RECIEVE_PAYLOAD_TIMEOUT : u64 = 30000 ;
2015-11-30 16:38:55 +01:00
2016-02-03 12:04:24 +01:00
pub trait GenericSocket : Read + Write {
2016-02-03 01:55:18 +01:00
}
impl GenericSocket for TcpStream {
}
pub struct GenericConnection < Socket : GenericSocket > {
2016-01-10 22:42:27 +01:00
/// Connection id (token)
2016-01-21 16:48:37 +01:00
pub token : StreamToken ,
2016-01-10 22:42:27 +01:00
/// Network socket
2016-02-03 01:55:18 +01:00
pub socket : Socket ,
2016-01-10 22:42:27 +01:00
/// Receive buffer
2015-11-30 16:38:55 +01:00
rec_buf : Bytes ,
2016-01-10 22:42:27 +01:00
/// Expected size
2015-11-30 16:38:55 +01:00
rec_size : usize ,
2016-01-10 22:42:27 +01:00
/// Send out packets FIFO
2015-12-02 20:11:13 +01:00
send_queue : VecDeque < Cursor < Bytes > > ,
2016-01-10 22:42:27 +01:00
/// Event flags this connection expects
2016-10-30 09:56:34 +01:00
interest : Ready ,
2016-02-03 01:55:18 +01:00
/// Shared network statistics
2016-01-24 18:53:54 +01:00
stats : Arc < NetworkStats > ,
2016-06-13 18:55:24 +02:00
/// Registered flag
registered : AtomicBool ,
2015-11-30 16:38:55 +01:00
}
2016-02-03 01:55:18 +01:00
impl < Socket : GenericSocket > GenericConnection < Socket > {
2015-12-02 20:11:13 +01:00
pub fn expect ( & mut self , size : usize ) {
2016-06-13 18:55:24 +02:00
trace! ( target :" network " , " Expect to read {} bytes " , size ) ;
2015-12-02 20:11:13 +01:00
if self . rec_size ! = self . rec_buf . len ( ) {
2016-06-13 18:55:24 +02:00
warn! ( target :" network " , " Unexpected connection read start " ) ;
2015-12-02 20:11:13 +01:00
}
self . rec_size = size ;
}
2016-01-10 22:42:27 +01:00
/// Readable IO handler. Called when there is some data to be read.
2015-12-02 20:11:13 +01:00
pub fn readable ( & mut self ) -> io ::Result < Option < Bytes > > {
if self . rec_size = = 0 | | self . rec_buf . len ( ) > = self . rec_size {
2016-07-26 16:04:14 +02:00
return Ok ( None ) ;
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
let sock_ref = < Socket as Read > ::by_ref ( & mut self . socket ) ;
2016-06-13 18:55:24 +02:00
loop {
let max = self . rec_size - self . rec_buf . len ( ) ;
2016-10-30 09:56:34 +01:00
match sock_ref . take ( max as u64 ) . try_read ( unsafe { self . rec_buf . mut_bytes ( ) } ) {
2016-06-13 18:55:24 +02:00
Ok ( Some ( size ) ) if size ! = 0 = > {
2016-10-30 09:56:34 +01:00
unsafe { self . rec_buf . advance ( size ) ; }
2016-06-13 18:55:24 +02:00
self . stats . inc_recv ( size ) ;
trace! ( target :" network " , " {}: Read {} of {} bytes " , self . token , self . rec_buf . len ( ) , self . rec_size ) ;
if self . rec_size ! = 0 & & self . rec_buf . len ( ) = = self . rec_size {
self . rec_size = 0 ;
return Ok ( Some ( ::std ::mem ::replace ( & mut self . rec_buf , Bytes ::new ( ) ) ) )
}
else if self . rec_buf . len ( ) > self . rec_size {
warn! ( target :" network " , " Read past buffer {} bytes " , self . rec_buf . len ( ) - self . rec_size ) ;
return Ok ( Some ( ::std ::mem ::replace ( & mut self . rec_buf , Bytes ::new ( ) ) ) )
}
} ,
Ok ( _ ) = > return Ok ( None ) ,
2016-07-13 19:59:59 +02:00
Err ( e ) = > {
2016-06-13 18:55:24 +02:00
debug! ( target :" network " , " Read error {} ({}) " , self . token , e ) ;
return Err ( e )
}
}
}
2016-07-13 19:59:59 +02:00
}
2015-12-03 15:11:40 +01:00
2016-01-10 22:42:27 +01:00
/// Add a packet to send queue.
2016-10-20 14:49:12 +02:00
pub fn send < Message > ( & mut self , io : & IoContext < Message > , data : Bytes ) where Message : Send + Clone + Sync + 'static {
2016-01-19 12:14:29 +01:00
if ! data . is_empty ( ) {
2016-10-18 18:16:00 +02:00
trace! ( target :" network " , " {}: Sending {} bytes " , self . token , data . len ( ) ) ;
2015-12-02 20:11:13 +01:00
self . send_queue . push_back ( Cursor ::new ( data ) ) ;
2016-10-18 18:16:00 +02:00
if ! self . interest . is_writable ( ) {
2016-10-30 09:56:34 +01:00
self . interest . insert ( Ready ::writable ( ) ) ;
2016-10-18 18:16:00 +02:00
}
2016-06-13 18:55:24 +02:00
io . update_registration ( self . token ) . ok ( ) ;
2015-12-03 15:11:40 +01:00
}
2015-12-02 20:11:13 +01:00
}
2016-05-02 14:48:30 +02:00
/// Check if this connection has data to be sent.
pub fn is_sending ( & self ) -> bool {
self . interest . is_writable ( )
}
2016-01-10 22:42:27 +01:00
/// Writable IO handler. Called when the socket is ready to send.
2016-10-20 14:49:12 +02:00
pub fn writable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < WriteStatus , NetworkError > where Message : Send + Clone + Sync + 'static {
2015-12-02 20:11:13 +01:00
{
2016-10-31 19:58:47 +01:00
let buf = match self . send_queue . front_mut ( ) {
Some ( buf ) = > buf ,
None = > return Ok ( WriteStatus ::Complete ) ,
} ;
2015-12-02 20:11:13 +01:00
let send_size = buf . get_ref ( ) . len ( ) ;
2016-10-30 09:56:34 +01:00
let pos = buf . position ( ) as usize ;
if ( pos as usize ) > = send_size {
2015-12-02 20:11:13 +01:00
warn! ( target :" net " , " Unexpected connection data " ) ;
return Ok ( WriteStatus ::Complete )
}
2016-10-30 09:56:34 +01:00
let buf = buf as & mut Buf ;
match self . socket . try_write ( buf . bytes ( ) ) {
Ok ( Some ( size ) ) if ( pos + size ) < send_size = > {
buf . advance ( size ) ;
2016-01-24 18:53:54 +01:00
self . stats . inc_send ( size ) ;
2015-12-02 20:11:13 +01:00
Ok ( WriteStatus ::Ongoing )
} ,
2016-10-30 09:56:34 +01:00
Ok ( Some ( size ) ) if ( pos + size ) = = send_size = > {
2016-01-24 18:53:54 +01:00
self . stats . inc_send ( size ) ;
2016-06-18 15:10:24 +02:00
trace! ( target :" network " , " {}: Wrote {} bytes " , self . token , send_size ) ;
2015-12-02 20:11:13 +01:00
Ok ( WriteStatus ::Complete )
} ,
2016-01-24 18:53:54 +01:00
Ok ( Some ( _ ) ) = > { panic! ( " Wrote past buffer " ) ; } ,
Ok ( None ) = > Ok ( WriteStatus ::Ongoing ) ,
2016-12-27 12:53:56 +01:00
Err ( e ) = > Err ( e ) ?
2015-12-02 20:11:13 +01:00
}
2015-12-03 15:11:40 +01:00
} . and_then ( | r | {
if r = = WriteStatus ::Complete {
2015-12-02 20:11:13 +01:00
self . send_queue . pop_front ( ) ;
2016-01-08 13:10:00 +01:00
}
2015-12-03 15:11:40 +01:00
if self . send_queue . is_empty ( ) {
2016-10-30 09:56:34 +01:00
self . interest . remove ( Ready ::writable ( ) ) ;
2016-12-27 12:53:56 +01:00
io . update_registration ( self . token ) ? ;
2015-12-03 15:11:40 +01:00
}
Ok ( r )
} )
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
}
/// Low level tcp connection
pub type Connection = GenericConnection < TcpStream > ;
impl Connection {
/// Create a new connection with given id and socket.
pub fn new ( token : StreamToken , socket : TcpStream , stats : Arc < NetworkStats > ) -> Connection {
Connection {
token : token ,
socket : socket ,
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-02-03 01:55:18 +01:00
stats : stats ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 01:55:18 +01:00
}
}
2015-12-02 20:11:13 +01:00
2016-03-12 10:07:55 +01:00
/// Get socket token
2016-02-12 09:52:32 +01:00
pub fn token ( & self ) -> StreamToken {
self . token
}
2016-02-15 19:54:27 +01:00
/// Get remote peer address
pub fn remote_addr ( & self ) -> io ::Result < SocketAddr > {
self . socket . peer_addr ( )
}
2016-06-02 11:49:56 +02:00
/// Get remote peer address string
pub fn remote_addr_str ( & self ) -> String {
self . socket . peer_addr ( ) . map ( | a | a . to_string ( ) ) . unwrap_or_else ( | _ | " Unknown " . to_owned ( ) )
}
2016-10-12 20:18:59 +02:00
/// Get local peer address string
pub fn local_addr_str ( & self ) -> String {
self . socket . local_addr ( ) . map ( | a | a . to_string ( ) ) . unwrap_or_else ( | _ | " Unknown " . to_owned ( ) )
}
2016-02-21 16:52:25 +01:00
/// Clone this connection. Clears the receiving buffer of the returned connection.
2016-02-19 22:09:06 +01:00
pub fn try_clone ( & self ) -> io ::Result < Self > {
Ok ( Connection {
token : self . token ,
2016-12-27 12:53:56 +01:00
socket : self . socket . try_clone ( ) ? ,
2016-02-19 22:09:06 +01:00
rec_buf : Vec ::new ( ) ,
rec_size : 0 ,
2016-02-20 01:10:27 +01:00
send_queue : self . send_queue . clone ( ) ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) ,
2016-02-19 22:09:06 +01:00
stats : self . stats . clone ( ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-19 22:09:06 +01:00
} )
}
2016-01-10 22:42:27 +01:00
/// Register this connection with the IO event loop.
2016-01-21 16:48:37 +01:00
pub fn register_socket < Host : Handler > ( & self , reg : Token , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-06-13 18:55:24 +02:00
if self . registered . load ( AtomicOrdering ::SeqCst ) {
return Ok ( ( ) ) ;
}
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection register; token={:?} " , reg ) ;
2016-06-02 11:49:56 +02:00
if let Err ( e ) = event_loop . register ( & self . socket , reg , self . interest , PollOpt ::edge ( ) /* | PollOpt::oneshot() */ ) { // TODO: oneshot is broken on windows
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " Failed to register {:?}, {:?} " , reg , e ) ;
2016-02-20 01:10:27 +01:00
}
2016-06-13 18:55:24 +02:00
self . registered . store ( true , AtomicOrdering ::SeqCst ) ;
2016-02-20 01:10:27 +01:00
Ok ( ( ) )
2015-12-03 15:11:40 +01:00
}
2016-01-10 22:42:27 +01:00
/// Update connection registration. Should be called at the end of the IO handler.
2016-01-21 16:48:37 +01:00
pub fn update_socket < Host : Handler > ( & self , reg : Token , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection reregister; token={:?} " , reg ) ;
2016-06-13 18:55:24 +02:00
if ! self . registered . load ( AtomicOrdering ::SeqCst ) {
self . register_socket ( reg , event_loop )
} else {
event_loop . reregister ( & self . socket , reg , self . interest , PollOpt ::edge ( ) /* | PollOpt::oneshot() */ ) . unwrap_or_else ( | e | { // TODO: oneshot is broken on windows
trace! ( target : " network " , " Failed to reregister {:?}, {:?} " , reg , e ) ;
} ) ;
2016-01-23 02:36:58 +01:00
Ok ( ( ) )
2016-06-13 18:55:24 +02:00
}
2015-12-03 15:11:40 +01:00
}
2016-01-22 18:13:59 +01:00
/// Delete connection registration. Should be called at the end of the IO handler.
pub fn deregister_socket < Host : Handler > ( & self , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection deregister; token={:?} " , self . token ) ;
2016-01-22 18:13:59 +01:00
event_loop . deregister ( & self . socket ) . ok ( ) ; // ignore errors here
Ok ( ( ) )
}
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
/// Connection write status.
#[ derive(PartialEq, Eq) ]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing ,
/// All data sent.
Complete
}
2016-04-06 10:07:24 +02:00
/// `RLPx` packet
2015-12-02 20:11:13 +01:00
pub struct Packet {
pub protocol : u16 ,
pub data : Bytes ,
}
2016-01-10 22:42:27 +01:00
/// Encrypted connection receiving state.
2015-12-02 12:07:46 +01:00
enum EncryptedConnectionState {
2016-01-10 22:42:27 +01:00
/// Reading a header.
2015-12-02 12:07:46 +01:00
Header ,
2016-01-10 22:42:27 +01:00
/// Reading the rest of the packet.
2015-12-02 12:07:46 +01:00
Payload ,
}
2016-04-06 10:07:24 +02:00
/// Connection implementing `RLPx` framing
2016-01-10 22:42:27 +01:00
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
2015-12-02 12:07:46 +01:00
pub struct EncryptedConnection {
2016-01-10 22:42:27 +01:00
/// Underlying tcp connection
2016-06-02 11:49:56 +02:00
pub connection : Connection ,
2016-01-10 22:42:27 +01:00
/// Egress data encryptor
2015-12-03 15:11:40 +01:00
encoder : CtrMode < AesSafe256Encryptor > ,
2016-01-10 22:42:27 +01:00
/// Ingress data decryptor
2015-12-03 15:11:40 +01:00
decoder : CtrMode < AesSafe256Encryptor > ,
2016-01-10 22:42:27 +01:00
/// Ingress data decryptor
2015-12-03 15:11:40 +01:00
mac_encoder : EcbEncryptor < AesSafe256Encryptor , EncPadding < NoPadding > > ,
2016-01-10 22:42:27 +01:00
/// MAC for egress data
2015-12-02 12:07:46 +01:00
egress_mac : Keccak ,
2016-01-10 22:42:27 +01:00
/// MAC for ingress data
2015-12-02 12:07:46 +01:00
ingress_mac : Keccak ,
2016-01-10 22:42:27 +01:00
/// Read state
2015-12-02 12:07:46 +01:00
read_state : EncryptedConnectionState ,
2016-01-10 22:42:27 +01:00
/// Protocol id for the last received packet
2015-12-02 12:07:46 +01:00
protocol_id : u16 ,
2016-01-10 22:42:27 +01:00
/// Payload expected to be received for the last header.
2016-01-10 14:02:01 +01:00
payload_len : usize ,
2015-12-02 12:07:46 +01:00
}
impl EncryptedConnection {
2016-06-13 18:55:24 +02:00
/// Create an encrypted connection out of the handshake.
2016-08-05 10:32:04 +02:00
pub fn new ( handshake : & mut Handshake ) -> Result < EncryptedConnection , NetworkError > {
2016-12-27 12:53:56 +01:00
let shared = crypto ::ecdh ::agree ( handshake . ecdhe . secret ( ) , & handshake . remote_ephemeral ) ? ;
2015-12-02 12:07:46 +01:00
let mut nonce_material = H512 ::new ( ) ;
if handshake . originated {
handshake . remote_nonce . copy_to ( & mut nonce_material [ 0 .. 32 ] ) ;
handshake . nonce . copy_to ( & mut nonce_material [ 32 .. 64 ] ) ;
}
else {
handshake . nonce . copy_to ( & mut nonce_material [ 0 .. 32 ] ) ;
handshake . remote_nonce . copy_to ( & mut nonce_material [ 32 .. 64 ] ) ;
}
let mut key_material = H512 ::new ( ) ;
shared . copy_to ( & mut key_material [ 0 .. 32 ] ) ;
nonce_material . sha3_into ( & mut key_material [ 32 .. 64 ] ) ;
key_material . sha3 ( ) . copy_to ( & mut key_material [ 32 .. 64 ] ) ;
2015-12-03 15:11:40 +01:00
key_material . sha3 ( ) . copy_to ( & mut key_material [ 32 .. 64 ] ) ;
2015-12-02 12:07:46 +01:00
let iv = vec! [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
let encoder = CtrMode ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , iv ) ;
2015-12-02 12:07:46 +01:00
let iv = vec! [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
let decoder = CtrMode ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , iv ) ;
2015-12-02 12:07:46 +01:00
key_material . sha3 ( ) . copy_to ( & mut key_material [ 32 .. 64 ] ) ;
2015-12-03 15:11:40 +01:00
let mac_encoder = EcbEncryptor ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , NoPadding ) ;
2015-12-02 12:07:46 +01:00
let mut egress_mac = Keccak ::new_keccak256 ( ) ;
let mut mac_material = & H256 ::from_slice ( & key_material [ 32 .. 64 ] ) ^ & handshake . remote_nonce ;
egress_mac . update ( & mac_material ) ;
egress_mac . update ( if handshake . originated { & handshake . auth_cipher } else { & handshake . ack_cipher } ) ;
2015-12-03 15:11:40 +01:00
2015-12-02 12:07:46 +01:00
let mut ingress_mac = Keccak ::new_keccak256 ( ) ;
2015-12-03 15:11:40 +01:00
mac_material = & H256 ::from_slice ( & key_material [ 32 .. 64 ] ) ^ & handshake . nonce ;
2015-12-02 12:07:46 +01:00
ingress_mac . update ( & mac_material ) ;
ingress_mac . update ( if handshake . originated { & handshake . ack_cipher } else { & handshake . auth_cipher } ) ;
2016-12-27 12:53:56 +01:00
let old_connection = handshake . connection . try_clone ( ) ? ;
2016-06-02 11:49:56 +02:00
let connection = ::std ::mem ::replace ( & mut handshake . connection , old_connection ) ;
2016-02-19 22:09:06 +01:00
let mut enc = EncryptedConnection {
2016-06-02 11:49:56 +02:00
connection : connection ,
2015-12-02 12:07:46 +01:00
encoder : encoder ,
decoder : decoder ,
mac_encoder : mac_encoder ,
egress_mac : egress_mac ,
ingress_mac : ingress_mac ,
read_state : EncryptedConnectionState ::Header ,
protocol_id : 0 ,
payload_len : 0
2016-02-19 22:09:06 +01:00
} ;
enc . connection . expect ( ENCRYPTED_HEADER_LEN ) ;
Ok ( enc )
2015-12-02 12:07:46 +01:00
}
2016-01-10 22:42:27 +01:00
/// Send a packet
2016-10-20 14:49:12 +02:00
pub fn send_packet < Message > ( & mut self , io : & IoContext < Message > , payload : & [ u8 ] ) -> Result < ( ) , NetworkError > where Message : Send + Clone + Sync + 'static {
2015-12-02 12:07:46 +01:00
let mut header = RlpStream ::new ( ) ;
let len = payload . len ( ) as usize ;
header . append_raw ( & [ ( len > > 16 ) as u8 , ( len > > 8 ) as u8 , len as u8 ] , 1 ) ;
header . append_raw ( & [ 0xc2 u8 , 0x80 u8 , 0x80 u8 ] , 1 ) ;
//TODO: ger rid of vectors here
let mut header = header . out ( ) ;
let padding = ( 16 - ( payload . len ( ) % 16 ) ) % 16 ;
header . resize ( 16 , 0 u8 ) ;
let mut packet = vec! [ 0 u8 ; ( 32 + payload . len ( ) + padding + 16 ) ] ;
self . encoder . encrypt ( & mut RefReadBuffer ::new ( & header ) , & mut RefWriteBuffer ::new ( & mut packet ) , false ) . expect ( " Invalid length or padding " ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . egress_mac , & mut self . mac_encoder , & packet [ 0 .. 16 ] ) ;
2015-12-02 12:07:46 +01:00
self . egress_mac . clone ( ) . finalize ( & mut packet [ 16 .. 32 ] ) ;
2016-07-26 20:31:25 +02:00
self . encoder . encrypt ( & mut RefReadBuffer ::new ( payload ) , & mut RefWriteBuffer ::new ( & mut packet [ 32 .. ( 32 + len ) ] ) , padding = = 0 ) . expect ( " Invalid length or padding " ) ;
2015-12-02 12:07:46 +01:00
if padding ! = 0 {
2016-01-10 14:02:01 +01:00
let pad = [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
self . encoder . encrypt ( & mut RefReadBuffer ::new ( & pad [ 0 .. padding ] ) , & mut RefWriteBuffer ::new ( & mut packet [ ( 32 + len ) .. ( 32 + len + padding ) ] ) , true ) . expect ( " Invalid length or padding " ) ;
2015-12-02 12:07:46 +01:00
}
self . egress_mac . update ( & packet [ 32 .. ( 32 + len + padding ) ] ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . egress_mac , & mut self . mac_encoder , & [ 0 u8 ; 0 ] ) ;
2015-12-02 12:07:46 +01:00
self . egress_mac . clone ( ) . finalize ( & mut packet [ ( 32 + len + padding ) .. ] ) ;
2016-06-13 18:55:24 +02:00
self . connection . send ( io , packet ) ;
2015-12-02 12:07:46 +01:00
Ok ( ( ) )
}
2016-01-10 22:42:27 +01:00
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
2016-08-05 10:32:04 +02:00
fn read_header ( & mut self , header : & [ u8 ] ) -> Result < ( ) , NetworkError > {
2015-12-02 12:07:46 +01:00
if header . len ( ) ! = ENCRYPTED_HEADER_LEN {
2016-01-10 12:53:55 +01:00
return Err ( From ::from ( NetworkError ::Auth ) ) ;
2015-12-02 12:07:46 +01:00
}
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . ingress_mac , & mut self . mac_encoder , & header [ 0 .. 16 ] ) ;
2015-12-02 12:07:46 +01:00
let mac = & header [ 16 .. ] ;
2015-12-03 15:11:40 +01:00
let mut expected = H256 ::new ( ) ;
2015-12-02 12:07:46 +01:00
self . ingress_mac . clone ( ) . finalize ( & mut expected ) ;
2015-12-03 15:11:40 +01:00
if mac ! = & expected [ 0 .. 16 ] {
2016-01-10 12:53:55 +01:00
return Err ( From ::from ( NetworkError ::Auth ) ) ;
2015-12-02 12:07:46 +01:00
}
2015-12-03 15:11:40 +01:00
let mut hdec = H128 ::new ( ) ;
self . decoder . decrypt ( & mut RefReadBuffer ::new ( & header [ 0 .. 16 ] ) , & mut RefWriteBuffer ::new ( & mut hdec ) , false ) . expect ( " Invalid length or padding " ) ;
let length = ( ( ( ( hdec [ 0 ] as u32 ) < < 8 ) + ( hdec [ 1 ] as u32 ) ) < < 8 ) + ( hdec [ 2 ] as u32 ) ;
let header_rlp = UntrustedRlp ::new ( & hdec [ 3 .. 6 ] ) ;
2016-12-27 12:53:56 +01:00
let protocol_id = header_rlp . val_at ::< u16 > ( 0 ) ? ;
2015-12-02 12:07:46 +01:00
2016-01-10 14:02:01 +01:00
self . payload_len = length as usize ;
2015-12-02 12:07:46 +01:00
self . protocol_id = protocol_id ;
self . read_state = EncryptedConnectionState ::Payload ;
let padding = ( 16 - ( length % 16 ) ) % 16 ;
let full_length = length + padding + 16 ;
self . connection . expect ( full_length as usize ) ;
Ok ( ( ) )
}
2016-01-10 22:42:27 +01:00
/// Decrypt and authenticate packet payload.
2016-08-05 10:32:04 +02:00
fn read_payload ( & mut self , payload : & [ u8 ] ) -> Result < Packet , NetworkError > {
2015-12-02 12:07:46 +01:00
let padding = ( 16 - ( self . payload_len % 16 ) ) % 16 ;
2016-01-10 14:02:01 +01:00
let full_length = self . payload_len + padding + 16 ;
2015-12-02 12:07:46 +01:00
if payload . len ( ) ! = full_length {
2016-01-10 12:53:55 +01:00
return Err ( From ::from ( NetworkError ::Auth ) ) ;
2015-12-02 12:07:46 +01:00
}
self . ingress_mac . update ( & payload [ 0 .. payload . len ( ) - 16 ] ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . ingress_mac , & mut self . mac_encoder , & [ 0 u8 ; 0 ] ) ;
2015-12-02 12:07:46 +01:00
let mac = & payload [ ( payload . len ( ) - 16 ) .. ] ;
let mut expected = H128 ::new ( ) ;
self . ingress_mac . clone ( ) . finalize ( & mut expected ) ;
if mac ! = & expected [ .. ] {
2016-01-10 12:53:55 +01:00
return Err ( From ::from ( NetworkError ::Auth ) ) ;
2015-12-02 12:07:46 +01:00
}
2016-01-10 14:02:01 +01:00
let mut packet = vec! [ 0 u8 ; self . payload_len ] ;
self . decoder . decrypt ( & mut RefReadBuffer ::new ( & payload [ 0 .. self . payload_len ] ) , & mut RefWriteBuffer ::new ( & mut packet ) , false ) . expect ( " Invalid length or padding " ) ;
let mut pad_buf = [ 0 u8 ; 16 ] ;
self . decoder . decrypt ( & mut RefReadBuffer ::new ( & payload [ self . payload_len .. ( payload . len ( ) - 16 ) ] ) , & mut RefWriteBuffer ::new ( & mut pad_buf ) , false ) . expect ( " Invalid length or padding " ) ;
2015-12-02 20:11:13 +01:00
Ok ( Packet {
protocol : self . protocol_id ,
data : packet
} )
2015-12-02 12:07:46 +01:00
}
2016-01-10 22:42:27 +01:00
/// Update MAC after reading or writing any data.
2015-12-03 15:11:40 +01:00
fn update_mac ( mac : & mut Keccak , mac_encoder : & mut EcbEncryptor < AesSafe256Encryptor , EncPadding < NoPadding > > , seed : & [ u8 ] ) {
let mut prev = H128 ::new ( ) ;
mac . clone ( ) . finalize ( & mut prev ) ;
let mut enc = H128 ::new ( ) ;
2016-10-31 19:58:47 +01:00
mac_encoder . encrypt ( & mut RefReadBuffer ::new ( & prev ) , & mut RefWriteBuffer ::new ( & mut enc ) , true ) . expect ( " Error updating MAC " ) ;
2015-12-03 15:11:40 +01:00
mac_encoder . reset ( ) ;
2016-01-08 13:49:00 +01:00
enc = enc ^ if seed . is_empty ( ) { prev } else { H128 ::from_slice ( seed ) } ;
2015-12-03 15:11:40 +01:00
mac . update ( & enc ) ;
}
2016-01-10 22:42:27 +01:00
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
2016-10-20 14:49:12 +02:00
pub fn readable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < Option < Packet > , NetworkError > where Message : Send + Clone + Sync + 'static {
2016-12-27 12:53:56 +01:00
io . clear_timer ( self . connection . token ) ? ;
2016-06-13 18:55:24 +02:00
if let EncryptedConnectionState ::Header = self . read_state {
2016-12-27 12:53:56 +01:00
if let Some ( data ) = self . connection . readable ( ) ? {
self . read_header ( & data ) ? ;
io . register_timer ( self . connection . token , RECIEVE_PAYLOAD_TIMEOUT ) ? ;
2016-06-13 18:55:24 +02:00
}
} ;
if let EncryptedConnectionState ::Payload = self . read_state {
2016-12-27 12:53:56 +01:00
match self . connection . readable ( ) ? {
2016-06-13 18:55:24 +02:00
Some ( data ) = > {
self . read_state = EncryptedConnectionState ::Header ;
self . connection . expect ( ENCRYPTED_HEADER_LEN ) ;
2016-12-27 12:53:56 +01:00
Ok ( Some ( self . read_payload ( & data ) ? ) )
2016-06-13 18:55:24 +02:00
} ,
None = > Ok ( None )
2015-12-02 12:07:46 +01:00
}
2016-06-13 18:55:24 +02:00
} else {
Ok ( None )
2015-12-02 12:07:46 +01:00
}
}
2016-01-10 22:42:27 +01:00
/// Writable IO handler. Processes send queeue.
2016-10-20 14:49:12 +02:00
pub fn writable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < ( ) , NetworkError > where Message : Send + Clone + Sync + 'static {
2016-12-27 12:53:56 +01:00
self . connection . writable ( io ) ? ;
2015-12-02 12:07:46 +01:00
Ok ( ( ) )
}
}
2015-12-03 15:11:40 +01:00
#[ test ]
2016-01-08 13:49:00 +01:00
pub fn test_encryption ( ) {
2016-08-05 10:32:04 +02:00
use util ::hash ::* ;
2015-12-03 15:11:40 +01:00
use std ::str ::FromStr ;
let key = H256 ::from_str ( " 2212767d793a7a3d66f869ae324dd11bd17044b82c9f463b8a541a4d089efec5 " ) . unwrap ( ) ;
let before = H128 ::from_str ( " 12532abaec065082a3cf1da7d0136f15 " ) . unwrap ( ) ;
let before2 = H128 ::from_str ( " 7e99f682356fdfbc6b67a9562787b18a " ) . unwrap ( ) ;
let after = H128 ::from_str ( " 89464c6b04e7c99e555c81d3f7266a05 " ) . unwrap ( ) ;
let after2 = H128 ::from_str ( " 85c070030589ef9c7a2879b3a8489316 " ) . unwrap ( ) ;
let mut got = H128 ::new ( ) ;
let mut encoder = EcbEncryptor ::new ( AesSafe256Encryptor ::new ( & key ) , NoPadding ) ;
encoder . encrypt ( & mut RefReadBuffer ::new ( & before ) , & mut RefWriteBuffer ::new ( & mut got ) , true ) . unwrap ( ) ;
encoder . reset ( ) ;
assert_eq! ( got , after ) ;
got = H128 ::new ( ) ;
encoder . encrypt ( & mut RefReadBuffer ::new ( & before2 ) , & mut RefWriteBuffer ::new ( & mut got ) , true ) . unwrap ( ) ;
encoder . reset ( ) ;
assert_eq! ( got , after2 ) ;
2016-02-03 16:05:46 +01:00
}
#[ cfg(test) ]
mod tests {
use super ::* ;
2016-07-13 19:59:59 +02:00
use std ::sync ::Arc ;
2016-06-13 18:55:24 +02:00
use std ::sync ::atomic ::AtomicBool ;
2016-02-03 16:05:46 +01:00
use super ::super ::stats ::* ;
2016-02-03 16:59:35 +01:00
use std ::io ::{ Read , Write , Error , Cursor , ErrorKind } ;
2016-10-30 09:56:34 +01:00
use mio ::{ Ready } ;
2016-02-03 16:05:46 +01:00
use std ::collections ::VecDeque ;
2016-11-28 17:05:37 +01:00
use util ::bytes ::Bytes ;
2016-03-26 17:08:06 +01:00
use devtools ::* ;
2016-06-13 18:55:24 +02:00
use io ::* ;
2016-02-03 16:05:46 +01:00
impl GenericSocket for TestSocket { }
2016-02-03 16:59:35 +01:00
struct TestBrokenSocket {
error : String
}
impl Read for TestBrokenSocket {
fn read ( & mut self , _ : & mut [ u8 ] ) -> Result < usize , Error > {
Err ( Error ::new ( ErrorKind ::Other , self . error . clone ( ) ) )
}
}
impl Write for TestBrokenSocket {
fn write ( & mut self , _ : & [ u8 ] ) -> Result < usize , Error > {
Err ( Error ::new ( ErrorKind ::Other , self . error . clone ( ) ) )
}
fn flush ( & mut self ) -> Result < ( ) , Error > {
unimplemented! ( ) ;
}
}
impl GenericSocket for TestBrokenSocket { }
2016-02-03 16:05:46 +01:00
type TestConnection = GenericConnection < TestSocket > ;
2016-03-12 10:07:55 +01:00
impl Default for TestConnection {
fn default ( ) -> Self {
TestConnection ::new ( )
}
}
2016-02-03 16:05:46 +01:00
impl TestConnection {
2016-03-12 10:07:55 +01:00
pub fn new ( ) -> Self {
2016-02-03 16:05:46 +01:00
TestConnection {
token : 999998888 usize ,
socket : TestSocket ::new ( ) ,
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-02-03 16:05:46 +01:00
stats : Arc ::< NetworkStats > ::new ( NetworkStats ::new ( ) ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 16:05:46 +01:00
}
}
}
2016-02-03 16:59:35 +01:00
type TestBrokenConnection = GenericConnection < TestBrokenSocket > ;
2016-03-12 10:07:55 +01:00
impl Default for TestBrokenConnection {
fn default ( ) -> Self {
TestBrokenConnection ::new ( )
}
}
2016-02-03 16:59:35 +01:00
impl TestBrokenConnection {
2016-03-12 10:07:55 +01:00
pub fn new ( ) -> Self {
2016-02-03 16:59:35 +01:00
TestBrokenConnection {
token : 999998888 usize ,
socket : TestBrokenSocket { error : " test broken socket " . to_owned ( ) } ,
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-02-03 16:59:35 +01:00
stats : Arc ::< NetworkStats > ::new ( NetworkStats ::new ( ) ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 16:59:35 +01:00
}
}
}
2016-06-13 18:55:24 +02:00
fn test_io ( ) -> IoContext < i32 > {
IoContext ::new ( IoChannel ::disconnected ( ) , 0 )
}
2016-02-03 16:05:46 +01:00
#[ test ]
fn connection_expect ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . expect ( 1024 ) ;
2016-02-03 16:40:54 +01:00
assert_eq! ( 1024 , connection . rec_size ) ;
}
#[ test ]
fn connection_write_empty ( ) {
let mut connection = TestConnection ::new ( ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Complete = = status . unwrap ( ) ) ;
}
#[ test ]
fn connection_write ( ) {
let mut connection = TestConnection ::new ( ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Complete = = status . unwrap ( ) ) ;
assert_eq! ( 10240 , connection . socket . write_buffer . len ( ) ) ;
}
#[ test ]
fn connection_write_is_buffered ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . socket = TestSocket ::new_buf ( 1024 ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Ongoing = = status . unwrap ( ) ) ;
assert_eq! ( 1024 , connection . socket . write_buffer . len ( ) ) ;
2016-02-03 16:05:46 +01:00
}
2016-02-03 16:59:35 +01:00
#[ test ]
2016-02-03 19:01:39 +01:00
fn connection_write_to_broken ( ) {
2016-02-03 16:59:35 +01:00
let mut connection = TestBrokenConnection ::new ( ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:59:35 +01:00
assert! ( ! status . is_ok ( ) ) ;
assert_eq! ( 1 , connection . send_queue . len ( ) ) ;
}
2016-02-03 19:01:39 +01:00
#[ test ]
fn connection_read ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 2048 ;
connection . rec_buf = vec! [ 10 ; 1024 ] ;
connection . socket . read_buffer = vec! [ 99 ; 2048 ] ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 1024 , connection . socket . cursor ) ;
}
#[ test ]
fn connection_read_from_broken ( ) {
let mut connection = TestBrokenConnection ::new ( ) ;
connection . rec_size = 2048 ;
let status = connection . readable ( ) ;
assert! ( ! status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . rec_buf . len ( ) ) ;
}
#[ test ]
fn connection_read_nothing ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 2048 ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . rec_buf . len ( ) ) ;
}
#[ test ]
fn connection_read_full ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 1024 ;
connection . rec_buf = vec! [ 76 ; 1024 ] ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . socket . cursor ) ;
}
2016-02-03 17:00:05 +01:00
}