Merge pull request #33 from gavofyork/network

Padding decryption, connection registration fixed
This commit is contained in:
Gav Wood 2016-01-10 23:58:53 +01:00
commit e116bd3d20
5 changed files with 205 additions and 29 deletions

View File

@ -19,22 +19,33 @@ use tiny_keccak::Keccak;
const ENCRYPTED_HEADER_LEN: usize = 32; const ENCRYPTED_HEADER_LEN: usize = 32;
/// Low level tcp connection
pub struct Connection { pub struct Connection {
/// Connection id (token)
pub token: Token, pub token: Token,
/// Network socket
pub socket: TcpStream, pub socket: TcpStream,
/// Receive buffer
rec_buf: Bytes, rec_buf: Bytes,
/// Expected size
rec_size: usize, rec_size: usize,
/// Send out packets FIFO
send_queue: VecDeque<Cursor<Bytes>>, send_queue: VecDeque<Cursor<Bytes>>,
/// Event flags this connection expects
interest: EventSet, interest: EventSet,
} }
/// Connection write status.
#[derive(PartialEq, Eq)] #[derive(PartialEq, Eq)]
pub enum WriteStatus { pub enum WriteStatus {
/// Some data is still pending for current packet
Ongoing, Ongoing,
/// All data sent.
Complete Complete
} }
impl Connection { impl Connection {
/// Create a new connection with given id and socket.
pub fn new(token: Token, socket: TcpStream) -> Connection { pub fn new(token: Token, socket: TcpStream) -> Connection {
Connection { Connection {
token: token, token: token,
@ -46,6 +57,7 @@ impl Connection {
} }
} }
/// Put a connection into read mode. Receiving up `size` bytes of data.
pub fn expect(&mut self, size: usize) { pub fn expect(&mut self, size: usize) {
if self.rec_size != self.rec_buf.len() { if self.rec_size != self.rec_buf.len() {
warn!(target:"net", "Unexpected connection read start"); warn!(target:"net", "Unexpected connection read start");
@ -54,6 +66,7 @@ impl Connection {
self.rec_size = size; self.rec_size = size;
} }
/// Readable IO handler. Called when there is some data to be read.
//TODO: return a slice //TODO: return a slice
pub fn readable(&mut self) -> io::Result<Option<Bytes>> { pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size { if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
@ -72,6 +85,7 @@ impl Connection {
} }
} }
/// Add a packet to send queue.
pub fn send(&mut self, data: Bytes) { pub fn send(&mut self, data: Bytes) {
if data.len() != 0 { if data.len() != 0 {
self.send_queue.push_back(Cursor::new(data)); self.send_queue.push_back(Cursor::new(data));
@ -81,6 +95,7 @@ impl Connection {
} }
} }
/// Writable IO handler. Called when the socket is ready to send.
pub fn writable(&mut self) -> io::Result<WriteStatus> { pub fn writable(&mut self) -> io::Result<WriteStatus> {
if self.send_queue.is_empty() { if self.send_queue.is_empty() {
return Ok(WriteStatus::Complete) return Ok(WriteStatus::Complete)
@ -117,6 +132,7 @@ impl Connection {
}) })
} }
/// Register this connection with the IO event loop.
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> { pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", self.token); trace!(target: "net", "connection register; token={:?}", self.token);
self.interest.insert(EventSet::readable()); self.interest.insert(EventSet::readable());
@ -126,6 +142,7 @@ impl Connection {
}) })
} }
/// Update connection registration. Should be called at the end of the IO handler.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> { pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", self.token); trace!(target: "net", "connection reregister; token={:?}", self.token);
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
@ -135,30 +152,47 @@ impl Connection {
} }
} }
/// RLPx packet
pub struct Packet { pub struct Packet {
pub protocol: u16, pub protocol: u16,
pub data: Bytes, pub data: Bytes,
} }
/// Encrypted connection receiving state.
enum EncryptedConnectionState { enum EncryptedConnectionState {
/// Reading a header.
Header, Header,
/// Reading the rest of the packet.
Payload, Payload,
} }
/// Connection implementing RLPx framing
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
pub struct EncryptedConnection { pub struct EncryptedConnection {
/// Underlying tcp connection
connection: Connection, connection: Connection,
/// Egress data encryptor
encoder: CtrMode<AesSafe256Encryptor>, encoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
decoder: CtrMode<AesSafe256Encryptor>, decoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>,
/// MAC for egress data
egress_mac: Keccak, egress_mac: Keccak,
/// MAC for ingress data
ingress_mac: Keccak, ingress_mac: Keccak,
/// Read state
read_state: EncryptedConnectionState, read_state: EncryptedConnectionState,
/// Disconnect timeout
idle_timeout: Option<Timeout>, idle_timeout: Option<Timeout>,
/// Protocol id for the last received packet
protocol_id: u16, protocol_id: u16,
payload_len: u32, /// Payload expected to be received for the last header.
payload_len: usize,
} }
impl EncryptedConnection { impl EncryptedConnection {
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(handshake: Handshake) -> Result<EncryptedConnection, UtilError> { pub fn new(handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public)); let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
let mut nonce_material = H512::new(); let mut nonce_material = H512::new();
@ -208,6 +242,7 @@ impl EncryptedConnection {
}) })
} }
/// Send a packet
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> { pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> {
let mut header = RlpStream::new(); let mut header = RlpStream::new();
let len = payload.len() as usize; let len = payload.len() as usize;
@ -224,7 +259,7 @@ impl EncryptedConnection {
self.egress_mac.clone().finalize(&mut packet[16..32]); self.egress_mac.clone().finalize(&mut packet[16..32]);
self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding"); self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding");
if padding != 0 { if padding != 0 {
let pad = [08; 16]; let pad = [0u8; 16];
self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding"); self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding");
} }
self.egress_mac.update(&packet[32..(32 + len + padding)]); self.egress_mac.update(&packet[32..(32 + len + padding)]);
@ -234,6 +269,7 @@ impl EncryptedConnection {
Ok(()) Ok(())
} }
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> { fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> {
if header.len() != ENCRYPTED_HEADER_LEN { if header.len() != ENCRYPTED_HEADER_LEN {
return Err(From::from(NetworkError::Auth)); return Err(From::from(NetworkError::Auth));
@ -253,7 +289,7 @@ impl EncryptedConnection {
let header_rlp = UntrustedRlp::new(&hdec[3..6]); let header_rlp = UntrustedRlp::new(&hdec[3..6]);
let protocol_id = try!(header_rlp.val_at::<u16>(0)); let protocol_id = try!(header_rlp.val_at::<u16>(0));
self.payload_len = length; self.payload_len = length as usize;
self.protocol_id = protocol_id; self.protocol_id = protocol_id;
self.read_state = EncryptedConnectionState::Payload; self.read_state = EncryptedConnectionState::Payload;
@ -263,9 +299,10 @@ impl EncryptedConnection {
Ok(()) Ok(())
} }
/// Decrypt and authenticate packet payload.
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, UtilError> { fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, UtilError> {
let padding = (16 - (self.payload_len % 16)) % 16; let padding = (16 - (self.payload_len % 16)) % 16;
let full_length = (self.payload_len + padding + 16) as usize; let full_length = self.payload_len + padding + 16;
if payload.len() != full_length { if payload.len() != full_length {
return Err(From::from(NetworkError::Auth)); return Err(From::from(NetworkError::Auth));
} }
@ -278,15 +315,17 @@ impl EncryptedConnection {
return Err(From::from(NetworkError::Auth)); return Err(From::from(NetworkError::Auth));
} }
let mut packet = vec![0u8; self.payload_len as usize]; let mut packet = vec![0u8; self.payload_len];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding"); self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..self.payload_len]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
packet.resize(self.payload_len as usize, 0u8); let mut pad_buf = [0u8; 16];
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[self.payload_len..(payload.len() - 16)]), &mut RefWriteBuffer::new(&mut pad_buf), false).expect("Invalid length or padding");
Ok(Packet { Ok(Packet {
protocol: self.protocol_id, protocol: self.protocol_id,
data: packet data: packet
}) })
} }
/// Update MAC after reading or writing any data.
fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) { fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) {
let mut prev = H128::new(); let mut prev = H128::new();
mac.clone().finalize(&mut prev); mac.clone().finalize(&mut prev);
@ -298,9 +337,9 @@ impl EncryptedConnection {
mac.update(&enc); mac.update(&enc);
} }
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, UtilError> { pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.reregister(event_loop));
match self.read_state { match self.read_state {
EncryptedConnectionState::Header => { EncryptedConnectionState::Header => {
match try!(self.connection.readable()) { match try!(self.connection.readable()) {
@ -324,13 +363,14 @@ impl EncryptedConnection {
} }
} }
/// Writable IO handler. Processes send queeue.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { pub fn writable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable()); try!(self.connection.writable());
try!(self.connection.reregister(event_loop));
Ok(()) Ok(())
} }
/// Register this connection with the event handler.
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.expect(ENCRYPTED_HEADER_LEN); self.connection.expect(ENCRYPTED_HEADER_LEN);
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
@ -338,6 +378,12 @@ impl EncryptedConnection {
try!(self.connection.reregister(event_loop)); try!(self.connection.reregister(event_loop));
Ok(()) Ok(())
} }
/// Update connection registration. This should be called at the end of the event loop.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.reregister(event_loop));
Ok(())
}
} }
#[test] #[test]

View File

@ -12,23 +12,39 @@ use network::NetworkError;
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
enum HandshakeState { enum HandshakeState {
/// Just created
New, New,
/// Waiting for auth packet
ReadingAuth, ReadingAuth,
/// Waiting for ack packet
ReadingAck, ReadingAck,
/// Ready to start a session
StartSession, StartSession,
} }
/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake
pub struct Handshake { pub struct Handshake {
/// Remote node public key
pub id: NodeId, pub id: NodeId,
/// Underlying connection
pub connection: Connection, pub connection: Connection,
/// Handshake state
state: HandshakeState, state: HandshakeState,
/// Outgoing or incoming connection
pub originated: bool, pub originated: bool,
/// Disconnect timeout
idle_timeout: Option<Timeout>, idle_timeout: Option<Timeout>,
/// ECDH ephemeral
pub ecdhe: KeyPair, pub ecdhe: KeyPair,
/// Connection nonce
pub nonce: H256, pub nonce: H256,
/// Handshake public key
pub remote_public: Public, pub remote_public: Public,
/// Remote connection nonce.
pub remote_nonce: H256, pub remote_nonce: H256,
/// A copy of received encryped auth packet
pub auth_cipher: Bytes, pub auth_cipher: Bytes,
/// A copy of received encryped ack packet
pub ack_cipher: Bytes pub ack_cipher: Bytes
} }
@ -36,6 +52,7 @@ const AUTH_PACKET_SIZE: usize = 307;
const ACK_PACKET_SIZE: usize = 210; const ACK_PACKET_SIZE: usize = 210;
impl Handshake { impl Handshake {
/// Create a new handshake object
pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> { pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
Ok(Handshake { Ok(Handshake {
id: id.clone(), id: id.clone(),
@ -52,6 +69,7 @@ impl Handshake {
}) })
} }
/// Start a handhsake
pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> { pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> {
self.originated = originated; self.originated = originated;
if originated { if originated {
@ -64,10 +82,12 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Check if handshake is complete
pub fn done(&self) -> bool { pub fn done(&self) -> bool {
self.state == HandshakeState::StartSession self.state == HandshakeState::StartSession
} }
/// Readable IO handler. Drives the state change.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), UtilError> { pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
match self.state { match self.state {
@ -97,6 +117,7 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Writabe IO handler.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> { pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
try!(self.connection.writable()); try!(self.connection.writable());
@ -106,6 +127,7 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Register the IO handler with the event loop
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.idle_timeout.map(|t| event_loop.clear_timeout(t)); self.idle_timeout.map(|t| event_loop.clear_timeout(t));
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok(); self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
@ -113,6 +135,7 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Parse, validate and confirm auth message
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == AUTH_PACKET_SIZE); assert!(data.len() == AUTH_PACKET_SIZE);
@ -134,6 +157,7 @@ impl Handshake {
self.write_ack() self.write_ack()
} }
/// Parse and validate ack message
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> { fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
assert!(data.len() == ACK_PACKET_SIZE); assert!(data.len() == ACK_PACKET_SIZE);
@ -144,6 +168,7 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Sends auth message
fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> { fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
@ -170,6 +195,7 @@ impl Handshake {
Ok(()) Ok(())
} }
/// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> { fn write_ack(&mut self) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr()); trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants

View File

@ -22,7 +22,9 @@ const MAX_CONNECTIONS: usize = 1024;
const MAX_USER_TIMERS: usize = 32; const MAX_USER_TIMERS: usize = 32;
const IDEAL_PEERS: u32 = 10; const IDEAL_PEERS: u32 = 10;
/// Node public key
pub type NodeId = H512; pub type NodeId = H512;
/// IO Timer id
pub type TimerToken = usize; pub type TimerToken = usize;
#[derive(Debug)] #[derive(Debug)]
@ -47,13 +49,18 @@ impl NetworkConfiguration {
} }
#[derive(Debug)] #[derive(Debug)]
/// Noe address info
pub struct NodeEndpoint { pub struct NodeEndpoint {
/// IP(V4 or V6) address
address: SocketAddr, address: SocketAddr,
/// Address as string (can be host name).
address_str: String, address_str: String,
/// Conneciton port.
udp_port: u16 udp_port: u16
} }
impl NodeEndpoint { impl NodeEndpoint {
/// Create endpoint from string. Performs name resolution if given a host name.
fn from_str(s: &str) -> Result<NodeEndpoint, UtilError> { fn from_str(s: &str) -> Result<NodeEndpoint, UtilError> {
let address = s.to_socket_addrs().map(|mut i| i.next()); let address = s.to_socket_addrs().map(|mut i| i.next());
match address { match address {
@ -124,39 +131,52 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
const USER_TIMER: usize = LAST_CONNECTION; const USER_TIMER: usize = LAST_CONNECTION;
const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1;
/// Protocol handler level packet id
pub type PacketId = u8; pub type PacketId = u8;
/// Protocol / handler id
pub type ProtocolId = &'static str; pub type ProtocolId = &'static str;
/// Messages used to communitate with the event loop from other threads.
pub enum HostMessage { pub enum HostMessage {
/// Shutdown the event loop
Shutdown, Shutdown,
/// Register a new protocol handler.
AddHandler { AddHandler {
handler: Box<ProtocolHandler+Send>, handler: Box<ProtocolHandler+Send>,
protocol: ProtocolId, protocol: ProtocolId,
versions: Vec<u8>, versions: Vec<u8>,
}, },
/// Send data over the network.
Send { Send {
peer: PeerId, peer: PeerId,
packet_id: PacketId, packet_id: PacketId,
protocol: ProtocolId, protocol: ProtocolId,
data: Vec<u8>, data: Vec<u8>,
}, },
/// Broadcast a message across the protocol handlers.
UserMessage(UserMessage), UserMessage(UserMessage),
} }
/// Id for broadcast message
pub type UserMessageId = u32; pub type UserMessageId = u32;
/// User
pub struct UserMessage { pub struct UserMessage {
/// ID of a protocol
pub protocol: ProtocolId, pub protocol: ProtocolId,
pub id: UserMessageId, pub id: UserMessageId,
pub data: Option<Vec<u8>>, pub data: Option<Vec<u8>>,
} }
/// Local (temporary) peer session ID.
pub type PeerId = usize; pub type PeerId = usize;
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
/// Protocol info
pub struct CapabilityInfo { pub struct CapabilityInfo {
pub protocol: ProtocolId, pub protocol: ProtocolId,
pub version: u8, pub version: u8,
/// Total number of packet IDs this protocol support.
pub packet_count: u8, pub packet_count: u8,
} }
@ -169,7 +189,7 @@ impl Encodable for CapabilityInfo {
} }
} }
/// IO access point /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
pub struct HostIo<'s> { pub struct HostIo<'s> {
protocol: ProtocolId, protocol: ProtocolId,
connections: &'s mut Slab<ConnectionEntry>, connections: &'s mut Slab<ConnectionEntry>,
@ -179,6 +199,7 @@ pub struct HostIo<'s> {
} }
impl<'s> HostIo<'s> { impl<'s> HostIo<'s> {
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(protocol: ProtocolId, session: Option<Token>, event_loop: &'s mut EventLoop<Host>, connections: &'s mut Slab<ConnectionEntry>, timers: &'s mut Slab<UserTimer>) -> HostIo<'s> { fn new(protocol: ProtocolId, session: Option<Token>, event_loop: &'s mut EventLoop<Host>, connections: &'s mut Slab<ConnectionEntry>, timers: &'s mut Slab<UserTimer>) -> HostIo<'s> {
HostIo { HostIo {
protocol: protocol, protocol: protocol,
@ -252,24 +273,36 @@ struct UserTimer {
delay: u64, delay: u64,
} }
/// Shared host information
pub struct HostInfo { pub struct HostInfo {
/// Our private and public keys.
keys: KeyPair, keys: KeyPair,
/// Current network configuration
config: NetworkConfiguration, config: NetworkConfiguration,
/// Connection nonce.
nonce: H256, nonce: H256,
/// RLPx protocol version
pub protocol_version: u32, pub protocol_version: u32,
/// Client identifier
pub client_version: String, pub client_version: String,
/// TCP connection port.
pub listen_port: u16, pub listen_port: u16,
/// Registered capabilities (handlers)
pub capabilities: Vec<CapabilityInfo> pub capabilities: Vec<CapabilityInfo>
} }
impl HostInfo { impl HostInfo {
/// Returns public key
pub fn id(&self) -> &NodeId { pub fn id(&self) -> &NodeId {
self.keys.public() self.keys.public()
} }
/// Returns secret key
pub fn secret(&self) -> &Secret { pub fn secret(&self) -> &Secret {
self.keys.secret() self.keys.secret()
} }
/// Increments and returns connection nonce.
pub fn next_nonce(&mut self) -> H256 { pub fn next_nonce(&mut self) -> H256 {
self.nonce = self.nonce.sha3(); self.nonce = self.nonce.sha3();
return self.nonce.clone(); return self.nonce.clone();
@ -281,6 +314,7 @@ enum ConnectionEntry {
Session(Session) Session(Session)
} }
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
pub struct Host { pub struct Host {
info: HostInfo, info: HostInfo,
_udp_socket: UdpSocket, _udp_socket: UdpSocket,
@ -293,14 +327,14 @@ pub struct Host {
} }
impl Host { impl Host {
/// Creates a new instance and registers it with the event loop.
pub fn start(event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> { pub fn start(event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
let config = NetworkConfiguration::new(); let config = NetworkConfiguration::new();
/* /*
match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() {
Some(iface) => config.public_address = iface.addr.unwrap(), Some(iface) => config.public_address = iface.addr.unwrap(),
None => warn!("No public network interface"), None => warn!("No public network interface"),
} */
*/
let addr = config.listen_address; let addr = config.listen_address;
// Setup the server socket // Setup the server socket
@ -458,7 +492,7 @@ impl Host {
fn accept(&mut self, _event_loop: &mut EventLoop<Host>) { fn accept(&mut self, _event_loop: &mut EventLoop<Host>) {
warn!(target: "net", "accept"); trace!(target: "net", "accept");
} }
fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) { fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
@ -488,8 +522,17 @@ impl Host {
if create_session { if create_session {
self.start_session(token, event_loop); self.start_session(token, event_loop);
} }
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
},
_ => (),
}
} }
fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
self.kill_connection(token, event_loop);
}
fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) { fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
let mut kill = false; let mut kill = false;
@ -550,6 +593,12 @@ impl Host {
h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]);
} }
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Session(ref mut s)) => {
s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
},
_ => (),
}
} }
fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) { fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
@ -571,7 +620,23 @@ impl Host {
self.kill_connection(token, event_loop) self.kill_connection(token, event_loop)
} }
fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop<Host>) { fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake
Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => {
for (p, _) in self.handlers.iter_mut() {
if s.have_capability(p) {
to_disconnect.push(p);
}
}
},
_ => (),
}
for p in to_disconnect {
let mut h = self.handlers.get_mut(p).unwrap();
h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize());
}
self.connections.remove(token); self.connections.remove(token);
} }
} }
@ -581,7 +646,14 @@ impl Handler for Host {
type Message = HostMessage; type Message = HostMessage;
fn ready(&mut self, event_loop: &mut EventLoop<Host>, token: Token, events: EventSet) { fn ready(&mut self, event_loop: &mut EventLoop<Host>, token: Token, events: EventSet) {
if events.is_readable() { if events.is_hup() {
trace!(target: "net", "hup");
match token.as_usize() {
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop),
_ => warn!(target: "net", "Unexpected hup"),
};
}
else if events.is_readable() {
match token.as_usize() { match token.as_usize() {
TCP_ACCEPT => self.accept(event_loop), TCP_ACCEPT => self.accept(event_loop),
IDLE => self.maintain_network(event_loop), IDLE => self.maintain_network(event_loop),

View File

@ -7,27 +7,44 @@ use error::*;
use network::{NetworkError, DisconnectReason}; use network::{NetworkError, DisconnectReason};
use network::host::*; use network::host::*;
/// Peer session over encrypted connection.
/// When created waits for Hello packet exchange and signals ready state.
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
pub struct Session { pub struct Session {
/// Shared session information
pub info: SessionInfo, pub info: SessionInfo,
/// Underlying connection
connection: EncryptedConnection, connection: EncryptedConnection,
/// Session ready flag. Set after successfull Hello packet exchange
had_hello: bool, had_hello: bool,
} }
/// Structure used to report various session events.
pub enum SessionData { pub enum SessionData {
None, None,
/// Session is ready to send/receive packets.
Ready, Ready,
/// A packet has been received
Packet { Packet {
/// Packet data
data: Vec<u8>, data: Vec<u8>,
/// Packet protocol ID
protocol: &'static str, protocol: &'static str,
/// Zero based packet ID
packet_id: u8, packet_id: u8,
}, },
} }
/// Shared session information
pub struct SessionInfo { pub struct SessionInfo {
/// Peer public key
pub id: NodeId, pub id: NodeId,
/// Peer client ID
pub client_version: String, pub client_version: String,
/// Peer RLPx protocol version
pub protocol_version: u32, pub protocol_version: u32,
pub capabilities: Vec<SessionCapabilityInfo>, /// Peer protocol capabilities
capabilities: Vec<SessionCapabilityInfo>,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
@ -48,7 +65,7 @@ impl Decodable for PeerCapabilityInfo {
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub struct SessionCapabilityInfo { struct SessionCapabilityInfo {
pub protocol: &'static str, pub protocol: &'static str,
pub version: u8, pub version: u8,
pub packet_count: u8, pub packet_count: u8,
@ -65,6 +82,7 @@ const PACKET_USER: u8 = 0x10;
const PACKET_LAST: u8 = 0x7f; const PACKET_LAST: u8 = 0x7f;
impl Session { impl Session {
/// Create a new session out of comepleted handshake. Consumes handshake object.
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, UtilError> { pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, UtilError> {
let id = h.id.clone(); let id = h.id.clone();
let connection = try!(EncryptedConnection::new(h)); let connection = try!(EncryptedConnection::new(h));
@ -84,6 +102,12 @@ impl Session {
Ok(session) Ok(session)
} }
/// Check if session is ready to send/receive data
pub fn is_ready(&self) -> bool {
self.had_hello
}
/// Readable IO handler. Returns packet data if available.
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, UtilError> { pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, UtilError> {
match try!(self.connection.readable(event_loop)) { match try!(self.connection.readable(event_loop)) {
Some(data) => Ok(try!(self.read_packet(data, host))), Some(data) => Ok(try!(self.read_packet(data, host))),
@ -91,14 +115,22 @@ impl Session {
} }
} }
/// Writable IO handler. Sends pending packets.
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> { pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
self.connection.writable(event_loop) self.connection.writable(event_loop)
} }
/// Checks if peer supports given capability
pub fn have_capability(&self, protocol: &str) -> bool { pub fn have_capability(&self, protocol: &str) -> bool {
self.info.capabilities.iter().any(|c| c.protocol == protocol) self.info.capabilities.iter().any(|c| c.protocol == protocol)
} }
/// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.reregister(event_loop)
}
/// Send a protocol packet to peer.
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> { pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
let mut i = 0usize; let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol { while protocol != self.info.capabilities[i].protocol {
@ -160,7 +192,7 @@ impl Session {
fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> { fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> {
let mut rlp = RlpStream::new(); let mut rlp = RlpStream::new();
rlp.append(&(PACKET_HELLO as u32)); rlp.append_raw(&[PACKET_HELLO as u8], 0);
rlp.append_list(5) rlp.append_list(5)
.append(&host.protocol_version) .append(&host.protocol_version)
.append(&host.client_version) .append(&host.client_version)
@ -217,12 +249,12 @@ impl Session {
Ok(()) Ok(())
} }
fn write_ping(&mut self) -> Result<(), UtilError> { fn write_ping(&mut self) -> Result<(), UtilError> {
self.send(try!(Session::prepare(PACKET_PING, 0))) self.send(try!(Session::prepare(PACKET_PING)))
} }
fn write_pong(&mut self) -> Result<(), UtilError> { fn write_pong(&mut self) -> Result<(), UtilError> {
self.send(try!(Session::prepare(PACKET_PONG, 0))) self.send(try!(Session::prepare(PACKET_PONG)))
} }
fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError { fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
@ -234,10 +266,10 @@ impl Session {
NetworkError::Disconnect(reason) NetworkError::Disconnect(reason)
} }
fn prepare(packet_id: u8, items: usize) -> Result<RlpStream, UtilError> { fn prepare(packet_id: u8) -> Result<RlpStream, UtilError> {
let mut rlp = RlpStream::new_list(1); let mut rlp = RlpStream::new();
rlp.append(&(packet_id as u32)); rlp.append(&(packet_id as u32));
rlp.append_list(items); rlp.append_list(0);
Ok(rlp) Ok(rlp)
} }

View File

@ -188,7 +188,7 @@ impl<'a, 'view> View<'a, 'view> for UntrustedRlp<'a> where 'a: 'view {
} }
fn val_at<T>(&self, index: usize) -> Result<T, DecoderError> where T: Decodable { fn val_at<T>(&self, index: usize) -> Result<T, DecoderError> where T: Decodable {
self.at(index).unwrap().as_val() try!(self.at(index)).as_val()
} }
} }