2019-01-07 11:33:07 +01:00
// Copyright 2015-2019 Parity Technologies (UK) Ltd.
// This file is part of Parity Ethereum.
2016-02-05 13:40:41 +01:00
2019-01-07 11:33:07 +01:00
// Parity Ethereum is free software: you can redistribute it and/or modify
2016-02-05 13:40:41 +01:00
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
2019-01-07 11:33:07 +01:00
// Parity Ethereum is distributed in the hope that it will be useful,
2016-02-05 13:40:41 +01:00
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
2019-01-07 11:33:07 +01:00
// along with Parity Ethereum. If not, see <http://www.gnu.org/licenses/>.
2016-02-05 13:40:41 +01:00
2015-12-02 20:11:13 +01:00
use std ::collections ::VecDeque ;
2016-02-15 19:54:27 +01:00
use std ::net ::SocketAddr ;
2016-06-13 18:55:24 +02:00
use std ::sync ::atomic ::{ AtomicBool , Ordering as AtomicOrdering } ;
2018-04-14 21:35:58 +02:00
use std ::time ::Duration ;
2017-08-31 12:38:53 +02:00
use hash ::{ keccak , write_keccak } ;
2016-10-30 09:56:34 +01:00
use mio ::{ Token , Ready , PollOpt } ;
use mio ::deprecated ::{ Handler , EventLoop , TryRead , TryWrite } ;
2015-11-30 16:38:55 +01:00
use mio ::tcp ::* ;
2018-01-10 13:35:18 +01:00
use ethereum_types ::{ H128 , H256 , H512 } ;
Delete crates from parity-ethereum and fetch them from parity-common instead (#9083)
Use crates from parity-common: hashdb, keccak-hash, kvdb, kvdb-memorydb, kvdb-rocksdb, memorydb, parity-bytes, parity-crypto, path, patricia_trie, plain_hasher, rlp, target, test-support, trie-standardmap, triehash
2018-07-10 14:59:19 +02:00
use parity_bytes ::* ;
2018-04-16 15:52:12 +02:00
use rlp ::{ Rlp , RlpStream } ;
2016-02-03 01:55:18 +01:00
use std ::io ::{ self , Cursor , Read , Write } ;
2016-01-21 16:48:37 +01:00
use io ::{ IoContext , StreamToken } ;
2016-08-05 10:32:04 +02:00
use handshake ::Handshake ;
2015-12-02 12:07:46 +01:00
use rcrypto ::blockmodes ::* ;
use rcrypto ::aessafe ::* ;
use rcrypto ::symmetriccipher ::* ;
use rcrypto ::buffer ::* ;
use tiny_keccak ::Keccak ;
2017-07-06 11:36:15 +02:00
use bytes ::{ Buf , BufMut } ;
2018-05-05 11:02:33 +02:00
use ethkey ::crypto ;
2018-03-05 11:56:35 +01:00
use network ::{ Error , ErrorKind } ;
2015-12-02 12:07:46 +01:00
const ENCRYPTED_HEADER_LEN : usize = 32 ;
2018-04-14 21:35:58 +02:00
const RECEIVE_PAYLOAD : Duration = Duration ::from_secs ( 30 ) ;
2017-10-19 14:41:11 +02:00
pub const MAX_PAYLOAD_SIZE : usize = ( 1 < < 24 ) - 1 ;
2015-11-30 16:38:55 +01:00
2019-01-04 19:58:21 +01:00
/// Network responses should try not to go over this limit.
/// This should be lower than MAX_PAYLOAD_SIZE
pub const PAYLOAD_SOFT_LIMIT : usize = ( 1 < < 22 ) - 1 ;
2016-02-03 12:04:24 +01:00
pub trait GenericSocket : Read + Write {
2016-02-03 01:55:18 +01:00
}
impl GenericSocket for TcpStream {
}
pub struct GenericConnection < Socket : GenericSocket > {
2016-01-10 22:42:27 +01:00
/// Connection id (token)
2016-01-21 16:48:37 +01:00
pub token : StreamToken ,
2016-01-10 22:42:27 +01:00
/// Network socket
2016-02-03 01:55:18 +01:00
pub socket : Socket ,
2016-01-10 22:42:27 +01:00
/// Receive buffer
2015-11-30 16:38:55 +01:00
rec_buf : Bytes ,
2016-01-10 22:42:27 +01:00
/// Expected size
2015-11-30 16:38:55 +01:00
rec_size : usize ,
2016-01-10 22:42:27 +01:00
/// Send out packets FIFO
2015-12-02 20:11:13 +01:00
send_queue : VecDeque < Cursor < Bytes > > ,
2016-01-10 22:42:27 +01:00
/// Event flags this connection expects
2016-10-30 09:56:34 +01:00
interest : Ready ,
2016-06-13 18:55:24 +02:00
/// Registered flag
registered : AtomicBool ,
2015-11-30 16:38:55 +01:00
}
2016-02-03 01:55:18 +01:00
impl < Socket : GenericSocket > GenericConnection < Socket > {
2015-12-02 20:11:13 +01:00
pub fn expect ( & mut self , size : usize ) {
2016-06-13 18:55:24 +02:00
trace! ( target :" network " , " Expect to read {} bytes " , size ) ;
2015-12-02 20:11:13 +01:00
if self . rec_size ! = self . rec_buf . len ( ) {
2016-06-13 18:55:24 +02:00
warn! ( target :" network " , " Unexpected connection read start " ) ;
2015-12-02 20:11:13 +01:00
}
self . rec_size = size ;
}
2016-01-10 22:42:27 +01:00
/// Readable IO handler. Called when there is some data to be read.
2015-12-02 20:11:13 +01:00
pub fn readable ( & mut self ) -> io ::Result < Option < Bytes > > {
if self . rec_size = = 0 | | self . rec_buf . len ( ) > = self . rec_size {
2016-07-26 16:04:14 +02:00
return Ok ( None ) ;
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
let sock_ref = < Socket as Read > ::by_ref ( & mut self . socket ) ;
2016-06-13 18:55:24 +02:00
loop {
let max = self . rec_size - self . rec_buf . len ( ) ;
2017-07-06 11:36:15 +02:00
match sock_ref . take ( max as u64 ) . try_read ( unsafe { self . rec_buf . bytes_mut ( ) } ) {
2016-06-13 18:55:24 +02:00
Ok ( Some ( size ) ) if size ! = 0 = > {
2017-07-06 11:36:15 +02:00
unsafe { self . rec_buf . advance_mut ( size ) ; }
2016-06-13 18:55:24 +02:00
trace! ( target :" network " , " {}: Read {} of {} bytes " , self . token , self . rec_buf . len ( ) , self . rec_size ) ;
if self . rec_size ! = 0 & & self . rec_buf . len ( ) = = self . rec_size {
self . rec_size = 0 ;
return Ok ( Some ( ::std ::mem ::replace ( & mut self . rec_buf , Bytes ::new ( ) ) ) )
}
else if self . rec_buf . len ( ) > self . rec_size {
warn! ( target :" network " , " Read past buffer {} bytes " , self . rec_buf . len ( ) - self . rec_size ) ;
return Ok ( Some ( ::std ::mem ::replace ( & mut self . rec_buf , Bytes ::new ( ) ) ) )
}
} ,
Ok ( _ ) = > return Ok ( None ) ,
2016-07-13 19:59:59 +02:00
Err ( e ) = > {
2016-06-13 18:55:24 +02:00
debug! ( target :" network " , " Read error {} ({}) " , self . token , e ) ;
return Err ( e )
}
}
}
2016-07-13 19:59:59 +02:00
}
2015-12-03 15:11:40 +01:00
2016-01-10 22:42:27 +01:00
/// Add a packet to send queue.
2016-10-20 14:49:12 +02:00
pub fn send < Message > ( & mut self , io : & IoContext < Message > , data : Bytes ) where Message : Send + Clone + Sync + 'static {
2016-01-19 12:14:29 +01:00
if ! data . is_empty ( ) {
2016-10-18 18:16:00 +02:00
trace! ( target :" network " , " {}: Sending {} bytes " , self . token , data . len ( ) ) ;
2015-12-02 20:11:13 +01:00
self . send_queue . push_back ( Cursor ::new ( data ) ) ;
2016-10-18 18:16:00 +02:00
if ! self . interest . is_writable ( ) {
2016-10-30 09:56:34 +01:00
self . interest . insert ( Ready ::writable ( ) ) ;
2016-10-18 18:16:00 +02:00
}
2016-06-13 18:55:24 +02:00
io . update_registration ( self . token ) . ok ( ) ;
2015-12-03 15:11:40 +01:00
}
2015-12-02 20:11:13 +01:00
}
2016-05-02 14:48:30 +02:00
/// Check if this connection has data to be sent.
pub fn is_sending ( & self ) -> bool {
self . interest . is_writable ( )
}
2016-01-10 22:42:27 +01:00
/// Writable IO handler. Called when the socket is ready to send.
2017-11-13 14:37:08 +01:00
pub fn writable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < WriteStatus , Error > where Message : Send + Clone + Sync + 'static {
2015-12-02 20:11:13 +01:00
{
2016-10-31 19:58:47 +01:00
let buf = match self . send_queue . front_mut ( ) {
Some ( buf ) = > buf ,
None = > return Ok ( WriteStatus ::Complete ) ,
} ;
2015-12-02 20:11:13 +01:00
let send_size = buf . get_ref ( ) . len ( ) ;
2016-10-30 09:56:34 +01:00
let pos = buf . position ( ) as usize ;
if ( pos as usize ) > = send_size {
2015-12-02 20:11:13 +01:00
warn! ( target :" net " , " Unexpected connection data " ) ;
return Ok ( WriteStatus ::Complete )
}
2017-07-06 11:36:15 +02:00
match self . socket . try_write ( Buf ::bytes ( & buf ) ) {
2016-10-30 09:56:34 +01:00
Ok ( Some ( size ) ) if ( pos + size ) < send_size = > {
buf . advance ( size ) ;
2015-12-02 20:11:13 +01:00
Ok ( WriteStatus ::Ongoing )
} ,
2016-10-30 09:56:34 +01:00
Ok ( Some ( size ) ) if ( pos + size ) = = send_size = > {
2016-06-18 15:10:24 +02:00
trace! ( target :" network " , " {}: Wrote {} bytes " , self . token , send_size ) ;
2015-12-02 20:11:13 +01:00
Ok ( WriteStatus ::Complete )
} ,
2016-01-24 18:53:54 +01:00
Ok ( Some ( _ ) ) = > { panic! ( " Wrote past buffer " ) ; } ,
Ok ( None ) = > Ok ( WriteStatus ::Ongoing ) ,
2016-12-27 12:53:56 +01:00
Err ( e ) = > Err ( e ) ?
2015-12-02 20:11:13 +01:00
}
2015-12-03 15:11:40 +01:00
} . and_then ( | r | {
if r = = WriteStatus ::Complete {
2015-12-02 20:11:13 +01:00
self . send_queue . pop_front ( ) ;
2016-01-08 13:10:00 +01:00
}
2015-12-03 15:11:40 +01:00
if self . send_queue . is_empty ( ) {
2016-10-30 09:56:34 +01:00
self . interest . remove ( Ready ::writable ( ) ) ;
2015-12-03 15:11:40 +01:00
}
2017-01-10 12:11:32 +01:00
io . update_registration ( self . token ) ? ;
2015-12-03 15:11:40 +01:00
Ok ( r )
} )
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
}
/// Low level tcp connection
pub type Connection = GenericConnection < TcpStream > ;
impl Connection {
/// Create a new connection with given id and socket.
2018-03-28 08:45:36 +02:00
pub fn new ( token : StreamToken , socket : TcpStream ) -> Connection {
2016-02-03 01:55:18 +01:00
Connection {
2018-08-21 11:55:31 +02:00
token ,
socket ,
2016-02-03 01:55:18 +01:00
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 01:55:18 +01:00
}
}
2015-12-02 20:11:13 +01:00
2016-03-12 10:07:55 +01:00
/// Get socket token
2016-02-12 09:52:32 +01:00
pub fn token ( & self ) -> StreamToken {
self . token
}
2016-02-15 19:54:27 +01:00
/// Get remote peer address
pub fn remote_addr ( & self ) -> io ::Result < SocketAddr > {
self . socket . peer_addr ( )
}
2016-06-02 11:49:56 +02:00
/// Get remote peer address string
pub fn remote_addr_str ( & self ) -> String {
self . socket . peer_addr ( ) . map ( | a | a . to_string ( ) ) . unwrap_or_else ( | _ | " Unknown " . to_owned ( ) )
}
2016-10-12 20:18:59 +02:00
/// Get local peer address string
pub fn local_addr_str ( & self ) -> String {
self . socket . local_addr ( ) . map ( | a | a . to_string ( ) ) . unwrap_or_else ( | _ | " Unknown " . to_owned ( ) )
}
2016-02-21 16:52:25 +01:00
/// Clone this connection. Clears the receiving buffer of the returned connection.
2016-02-19 22:09:06 +01:00
pub fn try_clone ( & self ) -> io ::Result < Self > {
Ok ( Connection {
token : self . token ,
2016-12-27 12:53:56 +01:00
socket : self . socket . try_clone ( ) ? ,
2016-02-19 22:09:06 +01:00
rec_buf : Vec ::new ( ) ,
rec_size : 0 ,
2016-02-20 01:10:27 +01:00
send_queue : self . send_queue . clone ( ) ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-19 22:09:06 +01:00
} )
}
2016-01-10 22:42:27 +01:00
/// Register this connection with the IO event loop.
2016-01-21 16:48:37 +01:00
pub fn register_socket < Host : Handler > ( & self , reg : Token , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-06-13 18:55:24 +02:00
if self . registered . load ( AtomicOrdering ::SeqCst ) {
return Ok ( ( ) ) ;
}
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection register; token={:?} " , reg ) ;
2016-06-02 11:49:56 +02:00
if let Err ( e ) = event_loop . register ( & self . socket , reg , self . interest , PollOpt ::edge ( ) /* | PollOpt::oneshot() */ ) { // TODO: oneshot is broken on windows
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " Failed to register {:?}, {:?} " , reg , e ) ;
2016-02-20 01:10:27 +01:00
}
2016-06-13 18:55:24 +02:00
self . registered . store ( true , AtomicOrdering ::SeqCst ) ;
2016-02-20 01:10:27 +01:00
Ok ( ( ) )
2015-12-03 15:11:40 +01:00
}
2016-01-10 22:42:27 +01:00
/// Update connection registration. Should be called at the end of the IO handler.
2016-01-21 16:48:37 +01:00
pub fn update_socket < Host : Handler > ( & self , reg : Token , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection reregister; token={:?} " , reg ) ;
2016-06-13 18:55:24 +02:00
if ! self . registered . load ( AtomicOrdering ::SeqCst ) {
self . register_socket ( reg , event_loop )
} else {
event_loop . reregister ( & self . socket , reg , self . interest , PollOpt ::edge ( ) /* | PollOpt::oneshot() */ ) . unwrap_or_else ( | e | { // TODO: oneshot is broken on windows
trace! ( target : " network " , " Failed to reregister {:?}, {:?} " , reg , e ) ;
} ) ;
2016-01-23 02:36:58 +01:00
Ok ( ( ) )
2016-06-13 18:55:24 +02:00
}
2015-12-03 15:11:40 +01:00
}
2016-01-22 18:13:59 +01:00
/// Delete connection registration. Should be called at the end of the IO handler.
pub fn deregister_socket < Host : Handler > ( & self , event_loop : & mut EventLoop < Host > ) -> io ::Result < ( ) > {
2016-03-05 23:09:51 +01:00
trace! ( target : " network " , " connection deregister; token={:?} " , self . token ) ;
2016-01-22 18:13:59 +01:00
event_loop . deregister ( & self . socket ) . ok ( ) ; // ignore errors here
Ok ( ( ) )
}
2015-12-02 20:11:13 +01:00
}
2016-02-03 01:55:18 +01:00
/// Connection write status.
#[ derive(PartialEq, Eq) ]
pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing ,
/// All data sent.
Complete
}
2016-04-06 10:07:24 +02:00
/// `RLPx` packet
2015-12-02 20:11:13 +01:00
pub struct Packet {
pub protocol : u16 ,
pub data : Bytes ,
}
2016-01-10 22:42:27 +01:00
/// Encrypted connection receiving state.
2015-12-02 12:07:46 +01:00
enum EncryptedConnectionState {
2016-01-10 22:42:27 +01:00
/// Reading a header.
2015-12-02 12:07:46 +01:00
Header ,
2016-01-10 22:42:27 +01:00
/// Reading the rest of the packet.
2015-12-02 12:07:46 +01:00
Payload ,
}
2016-04-06 10:07:24 +02:00
/// Connection implementing `RLPx` framing
2016-01-10 22:42:27 +01:00
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
2015-12-02 12:07:46 +01:00
pub struct EncryptedConnection {
2016-01-10 22:42:27 +01:00
/// Underlying tcp connection
2016-06-02 11:49:56 +02:00
pub connection : Connection ,
2016-01-10 22:42:27 +01:00
/// Egress data encryptor
2015-12-03 15:11:40 +01:00
encoder : CtrMode < AesSafe256Encryptor > ,
2016-01-10 22:42:27 +01:00
/// Ingress data decryptor
2015-12-03 15:11:40 +01:00
decoder : CtrMode < AesSafe256Encryptor > ,
2016-01-10 22:42:27 +01:00
/// Ingress data decryptor
2015-12-03 15:11:40 +01:00
mac_encoder : EcbEncryptor < AesSafe256Encryptor , EncPadding < NoPadding > > ,
2016-01-10 22:42:27 +01:00
/// MAC for egress data
2015-12-02 12:07:46 +01:00
egress_mac : Keccak ,
2016-01-10 22:42:27 +01:00
/// MAC for ingress data
2015-12-02 12:07:46 +01:00
ingress_mac : Keccak ,
2016-01-10 22:42:27 +01:00
/// Read state
2015-12-02 12:07:46 +01:00
read_state : EncryptedConnectionState ,
2016-01-10 22:42:27 +01:00
/// Protocol id for the last received packet
2015-12-02 12:07:46 +01:00
protocol_id : u16 ,
2016-01-10 22:42:27 +01:00
/// Payload expected to be received for the last header.
2016-01-10 14:02:01 +01:00
payload_len : usize ,
2015-12-02 12:07:46 +01:00
}
impl EncryptedConnection {
2016-06-13 18:55:24 +02:00
/// Create an encrypted connection out of the handshake.
2017-11-13 14:37:08 +01:00
pub fn new ( handshake : & mut Handshake ) -> Result < EncryptedConnection , Error > {
2016-12-27 12:53:56 +01:00
let shared = crypto ::ecdh ::agree ( handshake . ecdhe . secret ( ) , & handshake . remote_ephemeral ) ? ;
2019-06-03 15:36:21 +02:00
let mut nonce_material = H512 ::default ( ) ;
2015-12-02 12:07:46 +01:00
if handshake . originated {
2019-06-03 15:36:21 +02:00
( & mut nonce_material [ 0 .. 32 ] ) . copy_from_slice ( handshake . remote_nonce . as_bytes ( ) ) ;
( & mut nonce_material [ 32 .. 64 ] ) . copy_from_slice ( handshake . nonce . as_bytes ( ) ) ;
2015-12-02 12:07:46 +01:00
}
else {
2019-06-03 15:36:21 +02:00
( & mut nonce_material [ 0 .. 32 ] ) . copy_from_slice ( handshake . nonce . as_bytes ( ) ) ;
( & mut nonce_material [ 32 .. 64 ] ) . copy_from_slice ( handshake . remote_nonce . as_bytes ( ) ) ;
2015-12-02 12:07:46 +01:00
}
2019-06-03 15:36:21 +02:00
let mut key_material = H512 ::default ( ) ;
( & mut key_material [ 0 .. 32 ] ) . copy_from_slice ( shared . as_bytes ( ) ) ;
2017-08-31 12:38:53 +02:00
write_keccak ( & nonce_material , & mut key_material [ 32 .. 64 ] ) ;
2019-06-03 15:36:21 +02:00
let key_material_keccak = keccak ( & key_material ) ;
( & mut key_material [ 32 .. 64 ] ) . copy_from_slice ( key_material_keccak . as_bytes ( ) ) ;
let key_material_keccak = keccak ( & key_material ) ;
( & mut key_material [ 32 .. 64 ] ) . copy_from_slice ( key_material_keccak . as_bytes ( ) ) ;
2015-12-02 12:07:46 +01:00
let iv = vec! [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
let encoder = CtrMode ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , iv ) ;
2015-12-02 12:07:46 +01:00
let iv = vec! [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
let decoder = CtrMode ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , iv ) ;
2015-12-02 12:07:46 +01:00
2019-06-03 15:36:21 +02:00
let key_material_keccak = keccak ( & key_material ) ;
( & mut key_material [ 32 .. 64 ] ) . copy_from_slice ( key_material_keccak . as_bytes ( ) ) ;
2015-12-03 15:11:40 +01:00
let mac_encoder = EcbEncryptor ::new ( AesSafe256Encryptor ::new ( & key_material [ 32 .. 64 ] ) , NoPadding ) ;
2015-12-02 12:07:46 +01:00
let mut egress_mac = Keccak ::new_keccak256 ( ) ;
2018-08-21 11:55:31 +02:00
let mut mac_material = H256 ::from_slice ( & key_material [ 32 .. 64 ] ) ^ handshake . remote_nonce ;
2019-06-03 15:36:21 +02:00
egress_mac . update ( mac_material . as_bytes ( ) ) ;
2015-12-02 12:07:46 +01:00
egress_mac . update ( if handshake . originated { & handshake . auth_cipher } else { & handshake . ack_cipher } ) ;
2015-12-03 15:11:40 +01:00
2015-12-02 12:07:46 +01:00
let mut ingress_mac = Keccak ::new_keccak256 ( ) ;
2018-08-21 11:55:31 +02:00
mac_material = H256 ::from_slice ( & key_material [ 32 .. 64 ] ) ^ handshake . nonce ;
2019-06-03 15:36:21 +02:00
ingress_mac . update ( mac_material . as_bytes ( ) ) ;
2015-12-02 12:07:46 +01:00
ingress_mac . update ( if handshake . originated { & handshake . ack_cipher } else { & handshake . auth_cipher } ) ;
2016-12-27 12:53:56 +01:00
let old_connection = handshake . connection . try_clone ( ) ? ;
2016-06-02 11:49:56 +02:00
let connection = ::std ::mem ::replace ( & mut handshake . connection , old_connection ) ;
2016-02-19 22:09:06 +01:00
let mut enc = EncryptedConnection {
2018-08-21 11:55:31 +02:00
connection ,
encoder ,
decoder ,
mac_encoder ,
egress_mac ,
ingress_mac ,
2015-12-02 12:07:46 +01:00
read_state : EncryptedConnectionState ::Header ,
protocol_id : 0 ,
2017-10-19 14:41:11 +02:00
payload_len : 0 ,
2016-02-19 22:09:06 +01:00
} ;
enc . connection . expect ( ENCRYPTED_HEADER_LEN ) ;
Ok ( enc )
2015-12-02 12:07:46 +01:00
}
2016-01-10 22:42:27 +01:00
/// Send a packet
2017-11-13 14:37:08 +01:00
pub fn send_packet < Message > ( & mut self , io : & IoContext < Message > , payload : & [ u8 ] ) -> Result < ( ) , Error > where Message : Send + Clone + Sync + 'static {
2015-12-02 12:07:46 +01:00
let mut header = RlpStream ::new ( ) ;
2017-02-17 12:20:25 +01:00
let len = payload . len ( ) ;
2017-10-19 14:41:11 +02:00
if len > MAX_PAYLOAD_SIZE {
2017-11-13 14:37:08 +01:00
bail! ( ErrorKind ::OversizedPacket ) ;
2017-02-17 12:20:25 +01:00
}
2015-12-02 12:07:46 +01:00
header . append_raw ( & [ ( len > > 16 ) as u8 , ( len > > 8 ) as u8 , len as u8 ] , 1 ) ;
header . append_raw ( & [ 0xc2 u8 , 0x80 u8 , 0x80 u8 ] , 1 ) ;
2018-08-17 16:04:03 +02:00
//TODO: get rid of vectors here
2015-12-02 12:07:46 +01:00
let mut header = header . out ( ) ;
let padding = ( 16 - ( payload . len ( ) % 16 ) ) % 16 ;
header . resize ( 16 , 0 u8 ) ;
2018-01-31 09:48:37 +01:00
let mut packet = vec! [ 0 u8 ; 32 + payload . len ( ) + padding + 16 ] ;
2015-12-02 12:07:46 +01:00
self . encoder . encrypt ( & mut RefReadBuffer ::new ( & header ) , & mut RefWriteBuffer ::new ( & mut packet ) , false ) . expect ( " Invalid length or padding " ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . egress_mac , & mut self . mac_encoder , & packet [ 0 .. 16 ] ) ;
2015-12-02 12:07:46 +01:00
self . egress_mac . clone ( ) . finalize ( & mut packet [ 16 .. 32 ] ) ;
2016-07-26 20:31:25 +02:00
self . encoder . encrypt ( & mut RefReadBuffer ::new ( payload ) , & mut RefWriteBuffer ::new ( & mut packet [ 32 .. ( 32 + len ) ] ) , padding = = 0 ) . expect ( " Invalid length or padding " ) ;
2015-12-02 12:07:46 +01:00
if padding ! = 0 {
2016-01-10 14:02:01 +01:00
let pad = [ 0 u8 ; 16 ] ;
2015-12-03 15:11:40 +01:00
self . encoder . encrypt ( & mut RefReadBuffer ::new ( & pad [ 0 .. padding ] ) , & mut RefWriteBuffer ::new ( & mut packet [ ( 32 + len ) .. ( 32 + len + padding ) ] ) , true ) . expect ( " Invalid length or padding " ) ;
2015-12-02 12:07:46 +01:00
}
self . egress_mac . update ( & packet [ 32 .. ( 32 + len + padding ) ] ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . egress_mac , & mut self . mac_encoder , & [ 0 u8 ; 0 ] ) ;
2015-12-02 12:07:46 +01:00
self . egress_mac . clone ( ) . finalize ( & mut packet [ ( 32 + len + padding ) .. ] ) ;
2016-06-13 18:55:24 +02:00
self . connection . send ( io , packet ) ;
2015-12-02 12:07:46 +01:00
Ok ( ( ) )
}
2016-01-10 22:42:27 +01:00
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
2017-11-13 14:37:08 +01:00
fn read_header ( & mut self , header : & [ u8 ] ) -> Result < ( ) , Error > {
2015-12-02 12:07:46 +01:00
if header . len ( ) ! = ENCRYPTED_HEADER_LEN {
2017-11-13 14:37:08 +01:00
return Err ( ErrorKind ::Auth . into ( ) ) ;
2015-12-02 12:07:46 +01:00
}
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . ingress_mac , & mut self . mac_encoder , & header [ 0 .. 16 ] ) ;
2015-12-02 12:07:46 +01:00
let mac = & header [ 16 .. ] ;
2019-06-03 15:36:21 +02:00
let mut expected = H256 ::zero ( ) ;
self . ingress_mac . clone ( ) . finalize ( expected . as_bytes_mut ( ) ) ;
2015-12-03 15:11:40 +01:00
if mac ! = & expected [ 0 .. 16 ] {
2017-11-13 14:37:08 +01:00
return Err ( ErrorKind ::Auth . into ( ) ) ;
2015-12-02 12:07:46 +01:00
}
2015-12-03 15:11:40 +01:00
2019-06-03 15:36:21 +02:00
let mut hdec = H128 ::default ( ) ;
self . decoder . decrypt (
& mut RefReadBuffer ::new ( & header [ 0 .. 16 ] ) ,
& mut RefWriteBuffer ::new ( hdec . as_bytes_mut ( ) ) ,
false ,
) . expect ( " Invalid length or padding " ) ;
2015-12-03 15:11:40 +01:00
let length = ( ( ( ( hdec [ 0 ] as u32 ) < < 8 ) + ( hdec [ 1 ] as u32 ) ) < < 8 ) + ( hdec [ 2 ] as u32 ) ;
2018-04-16 15:52:12 +02:00
let header_rlp = Rlp ::new ( & hdec [ 3 .. 6 ] ) ;
2016-12-27 12:53:56 +01:00
let protocol_id = header_rlp . val_at ::< u16 > ( 0 ) ? ;
2015-12-02 12:07:46 +01:00
2016-01-10 14:02:01 +01:00
self . payload_len = length as usize ;
2015-12-02 12:07:46 +01:00
self . protocol_id = protocol_id ;
self . read_state = EncryptedConnectionState ::Payload ;
let padding = ( 16 - ( length % 16 ) ) % 16 ;
let full_length = length + padding + 16 ;
self . connection . expect ( full_length as usize ) ;
Ok ( ( ) )
}
2016-01-10 22:42:27 +01:00
/// Decrypt and authenticate packet payload.
2017-11-13 14:37:08 +01:00
fn read_payload ( & mut self , payload : & [ u8 ] ) -> Result < Packet , Error > {
2015-12-02 12:07:46 +01:00
let padding = ( 16 - ( self . payload_len % 16 ) ) % 16 ;
2016-01-10 14:02:01 +01:00
let full_length = self . payload_len + padding + 16 ;
2015-12-02 12:07:46 +01:00
if payload . len ( ) ! = full_length {
2017-11-13 14:37:08 +01:00
return Err ( ErrorKind ::Auth . into ( ) ) ;
2015-12-02 12:07:46 +01:00
}
self . ingress_mac . update ( & payload [ 0 .. payload . len ( ) - 16 ] ) ;
2015-12-03 15:11:40 +01:00
EncryptedConnection ::update_mac ( & mut self . ingress_mac , & mut self . mac_encoder , & [ 0 u8 ; 0 ] ) ;
2015-12-02 12:07:46 +01:00
let mac = & payload [ ( payload . len ( ) - 16 ) .. ] ;
2019-06-03 15:36:21 +02:00
let mut expected = H128 ::default ( ) ;
self . ingress_mac . clone ( ) . finalize ( expected . as_bytes_mut ( ) ) ;
2015-12-02 12:07:46 +01:00
if mac ! = & expected [ .. ] {
2017-11-13 14:37:08 +01:00
return Err ( ErrorKind ::Auth . into ( ) ) ;
2015-12-02 12:07:46 +01:00
}
2016-01-10 14:02:01 +01:00
let mut packet = vec! [ 0 u8 ; self . payload_len ] ;
self . decoder . decrypt ( & mut RefReadBuffer ::new ( & payload [ 0 .. self . payload_len ] ) , & mut RefWriteBuffer ::new ( & mut packet ) , false ) . expect ( " Invalid length or padding " ) ;
let mut pad_buf = [ 0 u8 ; 16 ] ;
self . decoder . decrypt ( & mut RefReadBuffer ::new ( & payload [ self . payload_len .. ( payload . len ( ) - 16 ) ] ) , & mut RefWriteBuffer ::new ( & mut pad_buf ) , false ) . expect ( " Invalid length or padding " ) ;
2015-12-02 20:11:13 +01:00
Ok ( Packet {
protocol : self . protocol_id ,
data : packet
} )
2015-12-02 12:07:46 +01:00
}
2016-01-10 22:42:27 +01:00
/// Update MAC after reading or writing any data.
2015-12-03 15:11:40 +01:00
fn update_mac ( mac : & mut Keccak , mac_encoder : & mut EcbEncryptor < AesSafe256Encryptor , EncPadding < NoPadding > > , seed : & [ u8 ] ) {
2019-06-03 15:36:21 +02:00
let mut prev = H128 ::default ( ) ;
mac . clone ( ) . finalize ( prev . as_bytes_mut ( ) ) ;
let mut enc = H128 ::default ( ) ;
mac_encoder . encrypt (
& mut RefReadBuffer ::new ( prev . as_bytes ( ) ) ,
& mut RefWriteBuffer ::new ( enc . as_bytes_mut ( ) ) ,
true
) . expect ( " Error updating MAC " ) ;
2015-12-03 15:11:40 +01:00
mac_encoder . reset ( ) ;
2016-01-08 13:49:00 +01:00
enc = enc ^ if seed . is_empty ( ) { prev } else { H128 ::from_slice ( seed ) } ;
2019-06-03 15:36:21 +02:00
mac . update ( enc . as_bytes ( ) ) ;
2015-12-03 15:11:40 +01:00
}
2018-08-17 16:04:03 +02:00
/// Readable IO handler. Tracker receive status and returns decoded packet if available.
2017-11-13 14:37:08 +01:00
pub fn readable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < Option < Packet > , Error > where Message : Send + Clone + Sync + 'static {
2016-12-27 12:53:56 +01:00
io . clear_timer ( self . connection . token ) ? ;
2016-06-13 18:55:24 +02:00
if let EncryptedConnectionState ::Header = self . read_state {
2016-12-27 12:53:56 +01:00
if let Some ( data ) = self . connection . readable ( ) ? {
self . read_header ( & data ) ? ;
2018-04-14 21:35:58 +02:00
io . register_timer ( self . connection . token , RECEIVE_PAYLOAD ) ? ;
2016-06-13 18:55:24 +02:00
}
} ;
if let EncryptedConnectionState ::Payload = self . read_state {
2016-12-27 12:53:56 +01:00
match self . connection . readable ( ) ? {
2016-06-13 18:55:24 +02:00
Some ( data ) = > {
self . read_state = EncryptedConnectionState ::Header ;
self . connection . expect ( ENCRYPTED_HEADER_LEN ) ;
2016-12-27 12:53:56 +01:00
Ok ( Some ( self . read_payload ( & data ) ? ) )
2016-06-13 18:55:24 +02:00
} ,
None = > Ok ( None )
2015-12-02 12:07:46 +01:00
}
2016-06-13 18:55:24 +02:00
} else {
Ok ( None )
2015-12-02 12:07:46 +01:00
}
}
2018-08-17 16:04:03 +02:00
/// Writable IO handler. Processes send queue.
2017-11-13 14:37:08 +01:00
pub fn writable < Message > ( & mut self , io : & IoContext < Message > ) -> Result < ( ) , Error > where Message : Send + Clone + Sync + 'static {
2016-12-27 12:53:56 +01:00
self . connection . writable ( io ) ? ;
2015-12-02 12:07:46 +01:00
Ok ( ( ) )
}
}
2015-12-03 15:11:40 +01:00
#[ test ]
2016-01-08 13:49:00 +01:00
pub fn test_encryption ( ) {
2018-01-10 13:35:18 +01:00
use ethereum_types ::{ H256 , H128 } ;
2015-12-03 15:11:40 +01:00
use std ::str ::FromStr ;
let key = H256 ::from_str ( " 2212767d793a7a3d66f869ae324dd11bd17044b82c9f463b8a541a4d089efec5 " ) . unwrap ( ) ;
let before = H128 ::from_str ( " 12532abaec065082a3cf1da7d0136f15 " ) . unwrap ( ) ;
let before2 = H128 ::from_str ( " 7e99f682356fdfbc6b67a9562787b18a " ) . unwrap ( ) ;
let after = H128 ::from_str ( " 89464c6b04e7c99e555c81d3f7266a05 " ) . unwrap ( ) ;
let after2 = H128 ::from_str ( " 85c070030589ef9c7a2879b3a8489316 " ) . unwrap ( ) ;
2019-06-03 15:36:21 +02:00
let mut got = H128 ::zero ( ) ;
2015-12-03 15:11:40 +01:00
2019-06-03 15:36:21 +02:00
let mut encoder = EcbEncryptor ::new ( AesSafe256Encryptor ::new ( key . as_bytes ( ) ) , NoPadding ) ;
encoder . encrypt ( & mut RefReadBuffer ::new ( before . as_bytes ( ) ) , & mut RefWriteBuffer ::new ( got . as_bytes_mut ( ) ) , true ) . unwrap ( ) ;
2015-12-03 15:11:40 +01:00
encoder . reset ( ) ;
assert_eq! ( got , after ) ;
2019-06-03 15:36:21 +02:00
got = H128 ::zero ( ) ;
encoder . encrypt ( & mut RefReadBuffer ::new ( before2 . as_bytes ( ) ) , & mut RefWriteBuffer ::new ( got . as_bytes_mut ( ) ) , true ) . unwrap ( ) ;
2015-12-03 15:11:40 +01:00
encoder . reset ( ) ;
assert_eq! ( got , after2 ) ;
2016-02-03 16:05:46 +01:00
}
#[ cfg(test) ]
mod tests {
2018-01-19 14:41:34 +01:00
use std ::cmp ;
use std ::collections ::VecDeque ;
use std ::io ::{ Read , Write , Cursor , ErrorKind , Result , Error } ;
2016-06-13 18:55:24 +02:00
use std ::sync ::atomic ::AtomicBool ;
2018-01-19 14:41:34 +01:00
2016-10-30 09:56:34 +01:00
use mio ::{ Ready } ;
Delete crates from parity-ethereum and fetch them from parity-common instead (#9083)
Use crates from parity-common: hashdb, keccak-hash, kvdb, kvdb-memorydb, kvdb-rocksdb, memorydb, parity-bytes, parity-crypto, path, patricia_trie, plain_hasher, rlp, target, test-support, trie-standardmap, triehash
2018-07-10 14:59:19 +02:00
use parity_bytes ::Bytes ;
2016-06-13 18:55:24 +02:00
use io ::* ;
2018-01-19 14:41:34 +01:00
use super ::* ;
pub struct TestSocket {
pub read_buffer : Vec < u8 > ,
pub write_buffer : Vec < u8 > ,
pub cursor : usize ,
pub buf_size : usize ,
}
impl Default for TestSocket {
fn default ( ) -> Self {
TestSocket ::new ( )
}
}
impl TestSocket {
pub fn new ( ) -> Self {
TestSocket {
read_buffer : vec ! [ ] ,
write_buffer : vec ! [ ] ,
cursor : 0 ,
buf_size : 0 ,
}
}
pub fn new_buf ( buf_size : usize ) -> TestSocket {
TestSocket {
read_buffer : vec ! [ ] ,
write_buffer : vec ! [ ] ,
cursor : 0 ,
2018-08-21 11:55:31 +02:00
buf_size ,
2018-01-19 14:41:34 +01:00
}
}
}
impl Read for TestSocket {
fn read ( & mut self , buf : & mut [ u8 ] ) -> Result < usize > {
let end_position = cmp ::min ( self . read_buffer . len ( ) , self . cursor + buf . len ( ) ) ;
if self . cursor > end_position { return Ok ( 0 ) }
let len = cmp ::max ( end_position - self . cursor , 0 ) ;
match len {
0 = > Ok ( 0 ) ,
_ = > {
for i in self . cursor .. end_position {
buf [ i - self . cursor ] = self . read_buffer [ i ] ;
}
self . cursor = end_position ;
Ok ( len )
}
}
}
}
impl Write for TestSocket {
fn write ( & mut self , buf : & [ u8 ] ) -> Result < usize > {
if self . buf_size = = 0 | | buf . len ( ) < self . buf_size {
self . write_buffer . extend ( buf . iter ( ) . cloned ( ) ) ;
Ok ( buf . len ( ) )
}
else {
self . write_buffer . extend ( buf . iter ( ) . take ( self . buf_size ) . cloned ( ) ) ;
Ok ( self . buf_size )
}
}
fn flush ( & mut self ) -> Result < ( ) > {
unimplemented! ( ) ;
}
}
2016-02-03 16:05:46 +01:00
impl GenericSocket for TestSocket { }
2016-02-03 16:59:35 +01:00
struct TestBrokenSocket {
error : String
}
impl Read for TestBrokenSocket {
2018-01-19 14:41:34 +01:00
fn read ( & mut self , _ : & mut [ u8 ] ) -> Result < usize > {
2016-02-03 16:59:35 +01:00
Err ( Error ::new ( ErrorKind ::Other , self . error . clone ( ) ) )
}
}
impl Write for TestBrokenSocket {
2018-01-19 14:41:34 +01:00
fn write ( & mut self , _ : & [ u8 ] ) -> Result < usize > {
2016-02-03 16:59:35 +01:00
Err ( Error ::new ( ErrorKind ::Other , self . error . clone ( ) ) )
}
2018-01-19 14:41:34 +01:00
fn flush ( & mut self ) -> Result < ( ) > {
2016-02-03 16:59:35 +01:00
unimplemented! ( ) ;
}
}
impl GenericSocket for TestBrokenSocket { }
2016-02-03 16:05:46 +01:00
type TestConnection = GenericConnection < TestSocket > ;
2016-03-12 10:07:55 +01:00
impl Default for TestConnection {
fn default ( ) -> Self {
TestConnection ::new ( )
}
}
2016-02-03 16:05:46 +01:00
impl TestConnection {
2016-03-12 10:07:55 +01:00
pub fn new ( ) -> Self {
2016-02-03 16:05:46 +01:00
TestConnection {
token : 999998888 usize ,
socket : TestSocket ::new ( ) ,
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 16:05:46 +01:00
}
}
}
2016-02-03 16:59:35 +01:00
type TestBrokenConnection = GenericConnection < TestBrokenSocket > ;
2016-03-12 10:07:55 +01:00
impl Default for TestBrokenConnection {
fn default ( ) -> Self {
TestBrokenConnection ::new ( )
}
}
2016-02-03 16:59:35 +01:00
impl TestBrokenConnection {
2016-03-12 10:07:55 +01:00
pub fn new ( ) -> Self {
2016-02-03 16:59:35 +01:00
TestBrokenConnection {
token : 999998888 usize ,
socket : TestBrokenSocket { error : " test broken socket " . to_owned ( ) } ,
send_queue : VecDeque ::new ( ) ,
rec_buf : Bytes ::new ( ) ,
rec_size : 0 ,
2016-10-30 09:56:34 +01:00
interest : Ready ::hup ( ) | Ready ::readable ( ) ,
2016-06-13 18:55:24 +02:00
registered : AtomicBool ::new ( false ) ,
2016-02-03 16:59:35 +01:00
}
}
}
2016-06-13 18:55:24 +02:00
fn test_io ( ) -> IoContext < i32 > {
IoContext ::new ( IoChannel ::disconnected ( ) , 0 )
}
2016-02-03 16:05:46 +01:00
#[ test ]
fn connection_expect ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . expect ( 1024 ) ;
2016-02-03 16:40:54 +01:00
assert_eq! ( 1024 , connection . rec_size ) ;
}
#[ test ]
fn connection_write_empty ( ) {
let mut connection = TestConnection ::new ( ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Complete = = status . unwrap ( ) ) ;
}
#[ test ]
fn connection_write ( ) {
let mut connection = TestConnection ::new ( ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Complete = = status . unwrap ( ) ) ;
assert_eq! ( 10240 , connection . socket . write_buffer . len ( ) ) ;
}
#[ test ]
fn connection_write_is_buffered ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . socket = TestSocket ::new_buf ( 1024 ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:40:54 +01:00
assert! ( status . is_ok ( ) ) ;
assert! ( WriteStatus ::Ongoing = = status . unwrap ( ) ) ;
assert_eq! ( 1024 , connection . socket . write_buffer . len ( ) ) ;
2016-02-03 16:05:46 +01:00
}
2016-02-03 16:59:35 +01:00
#[ test ]
2016-02-03 19:01:39 +01:00
fn connection_write_to_broken ( ) {
2016-02-03 16:59:35 +01:00
let mut connection = TestBrokenConnection ::new ( ) ;
let data = Cursor ::new ( vec! [ 0 ; 10240 ] ) ;
connection . send_queue . push_back ( data ) ;
2016-06-13 18:55:24 +02:00
let status = connection . writable ( & test_io ( ) ) ;
2016-02-03 16:59:35 +01:00
assert! ( ! status . is_ok ( ) ) ;
assert_eq! ( 1 , connection . send_queue . len ( ) ) ;
}
2016-02-03 19:01:39 +01:00
#[ test ]
fn connection_read ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 2048 ;
connection . rec_buf = vec! [ 10 ; 1024 ] ;
connection . socket . read_buffer = vec! [ 99 ; 2048 ] ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 1024 , connection . socket . cursor ) ;
}
#[ test ]
fn connection_read_from_broken ( ) {
let mut connection = TestBrokenConnection ::new ( ) ;
connection . rec_size = 2048 ;
let status = connection . readable ( ) ;
assert! ( ! status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . rec_buf . len ( ) ) ;
}
#[ test ]
fn connection_read_nothing ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 2048 ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . rec_buf . len ( ) ) ;
}
#[ test ]
fn connection_read_full ( ) {
let mut connection = TestConnection ::new ( ) ;
connection . rec_size = 1024 ;
connection . rec_buf = vec! [ 76 ; 1024 ] ;
let status = connection . readable ( ) ;
assert! ( status . is_ok ( ) ) ;
assert_eq! ( 0 , connection . socket . cursor ) ;
}
2016-02-03 17:00:05 +01:00
}