Merge remote-tracking branch 'origin/master' into gav
This commit is contained in:
commit
a197f3f60e
@ -19,22 +19,33 @@ use tiny_keccak::Keccak;
|
||||
|
||||
const ENCRYPTED_HEADER_LEN: usize = 32;
|
||||
|
||||
/// Low level tcp connection
|
||||
pub struct Connection {
|
||||
/// Connection id (token)
|
||||
pub token: Token,
|
||||
/// Network socket
|
||||
pub socket: TcpStream,
|
||||
/// Receive buffer
|
||||
rec_buf: Bytes,
|
||||
/// Expected size
|
||||
rec_size: usize,
|
||||
/// Send out packets FIFO
|
||||
send_queue: VecDeque<Cursor<Bytes>>,
|
||||
/// Event flags this connection expects
|
||||
interest: EventSet,
|
||||
}
|
||||
|
||||
/// Connection write status.
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub enum WriteStatus {
|
||||
/// Some data is still pending for current packet
|
||||
Ongoing,
|
||||
/// All data sent.
|
||||
Complete
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
/// Create a new connection with given id and socket.
|
||||
pub fn new(token: Token, socket: TcpStream) -> Connection {
|
||||
Connection {
|
||||
token: token,
|
||||
@ -46,6 +57,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Put a connection into read mode. Receiving up `size` bytes of data.
|
||||
pub fn expect(&mut self, size: usize) {
|
||||
if self.rec_size != self.rec_buf.len() {
|
||||
warn!(target:"net", "Unexpected connection read start");
|
||||
@ -54,6 +66,7 @@ impl Connection {
|
||||
self.rec_size = size;
|
||||
}
|
||||
|
||||
/// Readable IO handler. Called when there is some data to be read.
|
||||
//TODO: return a slice
|
||||
pub fn readable(&mut self) -> io::Result<Option<Bytes>> {
|
||||
if self.rec_size == 0 || self.rec_buf.len() >= self.rec_size {
|
||||
@ -72,6 +85,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a packet to send queue.
|
||||
pub fn send(&mut self, data: Bytes) {
|
||||
if data.len() != 0 {
|
||||
self.send_queue.push_back(Cursor::new(data));
|
||||
@ -81,6 +95,7 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Writable IO handler. Called when the socket is ready to send.
|
||||
pub fn writable(&mut self) -> io::Result<WriteStatus> {
|
||||
if self.send_queue.is_empty() {
|
||||
return Ok(WriteStatus::Complete)
|
||||
@ -117,6 +132,7 @@ impl Connection {
|
||||
})
|
||||
}
|
||||
|
||||
/// Register this connection with the IO event loop.
|
||||
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||
trace!(target: "net", "connection register; token={:?}", self.token);
|
||||
self.interest.insert(EventSet::readable());
|
||||
@ -126,6 +142,7 @@ impl Connection {
|
||||
})
|
||||
}
|
||||
|
||||
/// Update connection registration. Should be called at the end of the IO handler.
|
||||
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||
trace!(target: "net", "connection reregister; token={:?}", self.token);
|
||||
event_loop.reregister( &self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
|
||||
@ -135,30 +152,47 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// RLPx packet
|
||||
pub struct Packet {
|
||||
pub protocol: u16,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
/// Encrypted connection receiving state.
|
||||
enum EncryptedConnectionState {
|
||||
/// Reading a header.
|
||||
Header,
|
||||
/// Reading the rest of the packet.
|
||||
Payload,
|
||||
}
|
||||
|
||||
/// Connection implementing RLPx framing
|
||||
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
|
||||
pub struct EncryptedConnection {
|
||||
/// Underlying tcp connection
|
||||
connection: Connection,
|
||||
/// Egress data encryptor
|
||||
encoder: CtrMode<AesSafe256Encryptor>,
|
||||
/// Ingress data decryptor
|
||||
decoder: CtrMode<AesSafe256Encryptor>,
|
||||
/// Ingress data decryptor
|
||||
mac_encoder: EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>,
|
||||
/// MAC for egress data
|
||||
egress_mac: Keccak,
|
||||
/// MAC for ingress data
|
||||
ingress_mac: Keccak,
|
||||
/// Read state
|
||||
read_state: EncryptedConnectionState,
|
||||
/// Disconnect timeout
|
||||
idle_timeout: Option<Timeout>,
|
||||
/// Protocol id for the last received packet
|
||||
protocol_id: u16,
|
||||
payload_len: u32,
|
||||
/// Payload expected to be received for the last header.
|
||||
payload_len: usize,
|
||||
}
|
||||
|
||||
impl EncryptedConnection {
|
||||
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
|
||||
pub fn new(handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
|
||||
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
|
||||
let mut nonce_material = H512::new();
|
||||
@ -208,6 +242,7 @@ impl EncryptedConnection {
|
||||
})
|
||||
}
|
||||
|
||||
/// Send a packet
|
||||
pub fn send_packet(&mut self, payload: &[u8]) -> Result<(), UtilError> {
|
||||
let mut header = RlpStream::new();
|
||||
let len = payload.len() as usize;
|
||||
@ -224,7 +259,7 @@ impl EncryptedConnection {
|
||||
self.egress_mac.clone().finalize(&mut packet[16..32]);
|
||||
self.encoder.encrypt(&mut RefReadBuffer::new(&payload), &mut RefWriteBuffer::new(&mut packet[32..(32 + len)]), padding == 0).expect("Invalid length or padding");
|
||||
if padding != 0 {
|
||||
let pad = [08; 16];
|
||||
let pad = [0u8; 16];
|
||||
self.encoder.encrypt(&mut RefReadBuffer::new(&pad[0..padding]), &mut RefWriteBuffer::new(&mut packet[(32 + len)..(32 + len + padding)]), true).expect("Invalid length or padding");
|
||||
}
|
||||
self.egress_mac.update(&packet[32..(32 + len + padding)]);
|
||||
@ -234,6 +269,7 @@ impl EncryptedConnection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decrypt and authenticate an incoming packet header. Prepare for receiving payload.
|
||||
fn read_header(&mut self, header: &[u8]) -> Result<(), UtilError> {
|
||||
if header.len() != ENCRYPTED_HEADER_LEN {
|
||||
return Err(From::from(NetworkError::Auth));
|
||||
@ -253,7 +289,7 @@ impl EncryptedConnection {
|
||||
let header_rlp = UntrustedRlp::new(&hdec[3..6]);
|
||||
let protocol_id = try!(header_rlp.val_at::<u16>(0));
|
||||
|
||||
self.payload_len = length;
|
||||
self.payload_len = length as usize;
|
||||
self.protocol_id = protocol_id;
|
||||
self.read_state = EncryptedConnectionState::Payload;
|
||||
|
||||
@ -263,9 +299,10 @@ impl EncryptedConnection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decrypt and authenticate packet payload.
|
||||
fn read_payload(&mut self, payload: &[u8]) -> Result<Packet, UtilError> {
|
||||
let padding = (16 - (self.payload_len % 16)) % 16;
|
||||
let full_length = (self.payload_len + padding + 16) as usize;
|
||||
let full_length = self.payload_len + padding + 16;
|
||||
if payload.len() != full_length {
|
||||
return Err(From::from(NetworkError::Auth));
|
||||
}
|
||||
@ -278,15 +315,17 @@ impl EncryptedConnection {
|
||||
return Err(From::from(NetworkError::Auth));
|
||||
}
|
||||
|
||||
let mut packet = vec![0u8; self.payload_len as usize];
|
||||
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..(full_length - 16)]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
|
||||
packet.resize(self.payload_len as usize, 0u8);
|
||||
let mut packet = vec![0u8; self.payload_len];
|
||||
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[0..self.payload_len]), &mut RefWriteBuffer::new(&mut packet), false).expect("Invalid length or padding");
|
||||
let mut pad_buf = [0u8; 16];
|
||||
self.decoder.decrypt(&mut RefReadBuffer::new(&payload[self.payload_len..(payload.len() - 16)]), &mut RefWriteBuffer::new(&mut pad_buf), false).expect("Invalid length or padding");
|
||||
Ok(Packet {
|
||||
protocol: self.protocol_id,
|
||||
data: packet
|
||||
})
|
||||
}
|
||||
|
||||
/// Update MAC after reading or writing any data.
|
||||
fn update_mac(mac: &mut Keccak, mac_encoder: &mut EcbEncryptor<AesSafe256Encryptor, EncPadding<NoPadding>>, seed: &[u8]) {
|
||||
let mut prev = H128::new();
|
||||
mac.clone().finalize(&mut prev);
|
||||
@ -298,9 +337,9 @@ impl EncryptedConnection {
|
||||
mac.update(&enc);
|
||||
}
|
||||
|
||||
/// Readable IO handler. Tracker receive status and returns decoded packet if avaialable.
|
||||
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<Option<Packet>, UtilError> {
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
try!(self.connection.reregister(event_loop));
|
||||
match self.read_state {
|
||||
EncryptedConnectionState::Header => {
|
||||
match try!(self.connection.readable()) {
|
||||
@ -324,13 +363,14 @@ impl EncryptedConnection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Writable IO handler. Processes send queeue.
|
||||
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
try!(self.connection.writable());
|
||||
try!(self.connection.reregister(event_loop));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Register this connection with the event handler.
|
||||
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
self.connection.expect(ENCRYPTED_HEADER_LEN);
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
@ -338,6 +378,12 @@ impl EncryptedConnection {
|
||||
try!(self.connection.reregister(event_loop));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update connection registration. This should be called at the end of the event loop.
|
||||
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
try!(self.connection.reregister(event_loop));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -12,23 +12,39 @@ use network::NetworkError;
|
||||
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
enum HandshakeState {
|
||||
/// Just created
|
||||
New,
|
||||
/// Waiting for auth packet
|
||||
ReadingAuth,
|
||||
/// Waiting for ack packet
|
||||
ReadingAck,
|
||||
/// Ready to start a session
|
||||
StartSession,
|
||||
}
|
||||
|
||||
/// RLPx protocol handhake. See https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake
|
||||
pub struct Handshake {
|
||||
/// Remote node public key
|
||||
pub id: NodeId,
|
||||
/// Underlying connection
|
||||
pub connection: Connection,
|
||||
/// Handshake state
|
||||
state: HandshakeState,
|
||||
/// Outgoing or incoming connection
|
||||
pub originated: bool,
|
||||
/// Disconnect timeout
|
||||
idle_timeout: Option<Timeout>,
|
||||
/// ECDH ephemeral
|
||||
pub ecdhe: KeyPair,
|
||||
/// Connection nonce
|
||||
pub nonce: H256,
|
||||
/// Handshake public key
|
||||
pub remote_public: Public,
|
||||
/// Remote connection nonce.
|
||||
pub remote_nonce: H256,
|
||||
/// A copy of received encryped auth packet
|
||||
pub auth_cipher: Bytes,
|
||||
/// A copy of received encryped ack packet
|
||||
pub ack_cipher: Bytes
|
||||
}
|
||||
|
||||
@ -36,6 +52,7 @@ const AUTH_PACKET_SIZE: usize = 307;
|
||||
const ACK_PACKET_SIZE: usize = 210;
|
||||
|
||||
impl Handshake {
|
||||
/// Create a new handshake object
|
||||
pub fn new(token: Token, id: &NodeId, socket: TcpStream, nonce: &H256) -> Result<Handshake, UtilError> {
|
||||
Ok(Handshake {
|
||||
id: id.clone(),
|
||||
@ -52,6 +69,7 @@ impl Handshake {
|
||||
})
|
||||
}
|
||||
|
||||
/// Start a handhsake
|
||||
pub fn start(&mut self, host: &HostInfo, originated: bool) -> Result<(), UtilError> {
|
||||
self.originated = originated;
|
||||
if originated {
|
||||
@ -64,10 +82,12 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if handshake is complete
|
||||
pub fn done(&self) -> bool {
|
||||
self.state == HandshakeState::StartSession
|
||||
}
|
||||
|
||||
/// Readable IO handler. Drives the state change.
|
||||
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<(), UtilError> {
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
match self.state {
|
||||
@ -97,6 +117,7 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writabe IO handler.
|
||||
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
try!(self.connection.writable());
|
||||
@ -106,6 +127,7 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Register the IO handler with the event loop
|
||||
pub fn register(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
self.idle_timeout.map(|t| event_loop.clear_timeout(t));
|
||||
self.idle_timeout = event_loop.timeout_ms(self.connection.token, 1800).ok();
|
||||
@ -113,6 +135,7 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse, validate and confirm auth message
|
||||
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
||||
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||
assert!(data.len() == AUTH_PACKET_SIZE);
|
||||
@ -134,6 +157,7 @@ impl Handshake {
|
||||
self.write_ack()
|
||||
}
|
||||
|
||||
/// Parse and validate ack message
|
||||
fn read_ack(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
||||
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||
assert!(data.len() == ACK_PACKET_SIZE);
|
||||
@ -144,6 +168,7 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends auth message
|
||||
fn write_auth(&mut self, host: &HostInfo) -> Result<(), UtilError> {
|
||||
trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
|
||||
@ -170,6 +195,7 @@ impl Handshake {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends ack message
|
||||
fn write_ack(&mut self) -> Result<(), UtilError> {
|
||||
trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
|
||||
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
|
||||
|
@ -22,7 +22,9 @@ const MAX_CONNECTIONS: usize = 1024;
|
||||
const MAX_USER_TIMERS: usize = 32;
|
||||
const IDEAL_PEERS: u32 = 10;
|
||||
|
||||
/// Node public key
|
||||
pub type NodeId = H512;
|
||||
/// IO Timer id
|
||||
pub type TimerToken = usize;
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -47,13 +49,18 @@ impl NetworkConfiguration {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Noe address info
|
||||
pub struct NodeEndpoint {
|
||||
/// IP(V4 or V6) address
|
||||
address: SocketAddr,
|
||||
/// Address as string (can be host name).
|
||||
address_str: String,
|
||||
/// Conneciton port.
|
||||
udp_port: u16
|
||||
}
|
||||
|
||||
impl NodeEndpoint {
|
||||
/// Create endpoint from string. Performs name resolution if given a host name.
|
||||
fn from_str(s: &str) -> Result<NodeEndpoint, UtilError> {
|
||||
let address = s.to_socket_addrs().map(|mut i| i.next());
|
||||
match address {
|
||||
@ -124,39 +131,52 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1;
|
||||
const USER_TIMER: usize = LAST_CONNECTION;
|
||||
const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1;
|
||||
|
||||
/// Protocol handler level packet id
|
||||
pub type PacketId = u8;
|
||||
/// Protocol / handler id
|
||||
pub type ProtocolId = &'static str;
|
||||
|
||||
/// Messages used to communitate with the event loop from other threads.
|
||||
pub enum HostMessage {
|
||||
/// Shutdown the event loop
|
||||
Shutdown,
|
||||
/// Register a new protocol handler.
|
||||
AddHandler {
|
||||
handler: Box<ProtocolHandler+Send>,
|
||||
protocol: ProtocolId,
|
||||
versions: Vec<u8>,
|
||||
},
|
||||
/// Send data over the network.
|
||||
Send {
|
||||
peer: PeerId,
|
||||
packet_id: PacketId,
|
||||
protocol: ProtocolId,
|
||||
data: Vec<u8>,
|
||||
},
|
||||
/// Broadcast a message across the protocol handlers.
|
||||
UserMessage(UserMessage),
|
||||
}
|
||||
|
||||
/// Id for broadcast message
|
||||
pub type UserMessageId = u32;
|
||||
|
||||
/// User
|
||||
pub struct UserMessage {
|
||||
/// ID of a protocol
|
||||
pub protocol: ProtocolId,
|
||||
pub id: UserMessageId,
|
||||
pub data: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
/// Local (temporary) peer session ID.
|
||||
pub type PeerId = usize;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
/// Protocol info
|
||||
pub struct CapabilityInfo {
|
||||
pub protocol: ProtocolId,
|
||||
pub version: u8,
|
||||
/// Total number of packet IDs this protocol support.
|
||||
pub packet_count: u8,
|
||||
}
|
||||
|
||||
@ -169,7 +189,7 @@ impl Encodable for CapabilityInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// IO access point
|
||||
/// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem.
|
||||
pub struct HostIo<'s> {
|
||||
protocol: ProtocolId,
|
||||
connections: &'s mut Slab<ConnectionEntry>,
|
||||
@ -179,6 +199,7 @@ pub struct HostIo<'s> {
|
||||
}
|
||||
|
||||
impl<'s> HostIo<'s> {
|
||||
/// Create a new IO access point. Takes references to all the data that can be updated within the IO handler.
|
||||
fn new(protocol: ProtocolId, session: Option<Token>, event_loop: &'s mut EventLoop<Host>, connections: &'s mut Slab<ConnectionEntry>, timers: &'s mut Slab<UserTimer>) -> HostIo<'s> {
|
||||
HostIo {
|
||||
protocol: protocol,
|
||||
@ -252,24 +273,36 @@ struct UserTimer {
|
||||
delay: u64,
|
||||
}
|
||||
|
||||
/// Shared host information
|
||||
pub struct HostInfo {
|
||||
/// Our private and public keys.
|
||||
keys: KeyPair,
|
||||
/// Current network configuration
|
||||
config: NetworkConfiguration,
|
||||
/// Connection nonce.
|
||||
nonce: H256,
|
||||
/// RLPx protocol version
|
||||
pub protocol_version: u32,
|
||||
/// Client identifier
|
||||
pub client_version: String,
|
||||
/// TCP connection port.
|
||||
pub listen_port: u16,
|
||||
/// Registered capabilities (handlers)
|
||||
pub capabilities: Vec<CapabilityInfo>
|
||||
}
|
||||
|
||||
impl HostInfo {
|
||||
/// Returns public key
|
||||
pub fn id(&self) -> &NodeId {
|
||||
self.keys.public()
|
||||
}
|
||||
|
||||
/// Returns secret key
|
||||
pub fn secret(&self) -> &Secret {
|
||||
self.keys.secret()
|
||||
}
|
||||
|
||||
/// Increments and returns connection nonce.
|
||||
pub fn next_nonce(&mut self) -> H256 {
|
||||
self.nonce = self.nonce.sha3();
|
||||
return self.nonce.clone();
|
||||
@ -281,6 +314,7 @@ enum ConnectionEntry {
|
||||
Session(Session)
|
||||
}
|
||||
|
||||
/// Root IO handler. Manages protocol handlers, IO timers and network connections.
|
||||
pub struct Host {
|
||||
info: HostInfo,
|
||||
_udp_socket: UdpSocket,
|
||||
@ -293,13 +327,13 @@ pub struct Host {
|
||||
}
|
||||
|
||||
impl Host {
|
||||
/// Creates a new instance and registers it with the event loop.
|
||||
pub fn start(event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
let config = NetworkConfiguration::new();
|
||||
/*
|
||||
match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() {
|
||||
Some(iface) => config.public_address = iface.addr.unwrap(),
|
||||
None => warn!("No public network interface"),
|
||||
}
|
||||
*/
|
||||
|
||||
let addr = config.listen_address;
|
||||
@ -458,7 +492,7 @@ impl Host {
|
||||
|
||||
|
||||
fn accept(&mut self, _event_loop: &mut EventLoop<Host>) {
|
||||
warn!(target: "net", "accept");
|
||||
trace!(target: "net", "accept");
|
||||
}
|
||||
|
||||
fn connection_writable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
|
||||
@ -488,8 +522,17 @@ impl Host {
|
||||
if create_session {
|
||||
self.start_session(token, event_loop);
|
||||
}
|
||||
match self.connections.get_mut(token) {
|
||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||
s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_closed(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
|
||||
self.kill_connection(token, event_loop);
|
||||
}
|
||||
|
||||
fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
|
||||
let mut kill = false;
|
||||
@ -550,6 +593,12 @@ impl Host {
|
||||
h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]);
|
||||
}
|
||||
|
||||
match self.connections.get_mut(token) {
|
||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||
s.reregister(event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_session(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
|
||||
@ -571,7 +620,23 @@ impl Host {
|
||||
self.kill_connection(token, event_loop)
|
||||
}
|
||||
|
||||
fn kill_connection(&mut self, token: Token, _event_loop: &mut EventLoop<Host>) {
|
||||
fn kill_connection(&mut self, token: Token, event_loop: &mut EventLoop<Host>) {
|
||||
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
|
||||
match self.connections.get_mut(token) {
|
||||
Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake
|
||||
Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => {
|
||||
for (p, _) in self.handlers.iter_mut() {
|
||||
if s.have_capability(p) {
|
||||
to_disconnect.push(p);
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
for p in to_disconnect {
|
||||
let mut h = self.handlers.get_mut(p).unwrap();
|
||||
h.disconnected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize());
|
||||
}
|
||||
self.connections.remove(token);
|
||||
}
|
||||
}
|
||||
@ -581,7 +646,14 @@ impl Handler for Host {
|
||||
type Message = HostMessage;
|
||||
|
||||
fn ready(&mut self, event_loop: &mut EventLoop<Host>, token: Token, events: EventSet) {
|
||||
if events.is_readable() {
|
||||
if events.is_hup() {
|
||||
trace!(target: "net", "hup");
|
||||
match token.as_usize() {
|
||||
FIRST_CONNECTION ... LAST_CONNECTION => self.connection_closed(token, event_loop),
|
||||
_ => warn!(target: "net", "Unexpected hup"),
|
||||
};
|
||||
}
|
||||
else if events.is_readable() {
|
||||
match token.as_usize() {
|
||||
TCP_ACCEPT => self.accept(event_loop),
|
||||
IDLE => self.maintain_network(event_loop),
|
||||
|
@ -7,27 +7,44 @@ use error::*;
|
||||
use network::{NetworkError, DisconnectReason};
|
||||
use network::host::*;
|
||||
|
||||
/// Peer session over encrypted connection.
|
||||
/// When created waits for Hello packet exchange and signals ready state.
|
||||
/// Sends and receives protocol packets and handles basic packes such as ping/pong and disconnect.
|
||||
pub struct Session {
|
||||
/// Shared session information
|
||||
pub info: SessionInfo,
|
||||
/// Underlying connection
|
||||
connection: EncryptedConnection,
|
||||
/// Session ready flag. Set after successfull Hello packet exchange
|
||||
had_hello: bool,
|
||||
}
|
||||
|
||||
/// Structure used to report various session events.
|
||||
pub enum SessionData {
|
||||
None,
|
||||
/// Session is ready to send/receive packets.
|
||||
Ready,
|
||||
/// A packet has been received
|
||||
Packet {
|
||||
/// Packet data
|
||||
data: Vec<u8>,
|
||||
/// Packet protocol ID
|
||||
protocol: &'static str,
|
||||
/// Zero based packet ID
|
||||
packet_id: u8,
|
||||
},
|
||||
}
|
||||
|
||||
/// Shared session information
|
||||
pub struct SessionInfo {
|
||||
/// Peer public key
|
||||
pub id: NodeId,
|
||||
/// Peer client ID
|
||||
pub client_version: String,
|
||||
/// Peer RLPx protocol version
|
||||
pub protocol_version: u32,
|
||||
pub capabilities: Vec<SessionCapabilityInfo>,
|
||||
/// Peer protocol capabilities
|
||||
capabilities: Vec<SessionCapabilityInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
@ -48,7 +65,7 @@ impl Decodable for PeerCapabilityInfo {
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct SessionCapabilityInfo {
|
||||
struct SessionCapabilityInfo {
|
||||
pub protocol: &'static str,
|
||||
pub version: u8,
|
||||
pub packet_count: u8,
|
||||
@ -65,6 +82,7 @@ const PACKET_USER: u8 = 0x10;
|
||||
const PACKET_LAST: u8 = 0x7f;
|
||||
|
||||
impl Session {
|
||||
/// Create a new session out of comepleted handshake. Consumes handshake object.
|
||||
pub fn new(h: Handshake, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<Session, UtilError> {
|
||||
let id = h.id.clone();
|
||||
let connection = try!(EncryptedConnection::new(h));
|
||||
@ -84,6 +102,12 @@ impl Session {
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
/// Check if session is ready to send/receive data
|
||||
pub fn is_ready(&self) -> bool {
|
||||
self.had_hello
|
||||
}
|
||||
|
||||
/// Readable IO handler. Returns packet data if available.
|
||||
pub fn readable(&mut self, event_loop: &mut EventLoop<Host>, host: &HostInfo) -> Result<SessionData, UtilError> {
|
||||
match try!(self.connection.readable(event_loop)) {
|
||||
Some(data) => Ok(try!(self.read_packet(data, host))),
|
||||
@ -91,14 +115,22 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
/// Writable IO handler. Sends pending packets.
|
||||
pub fn writable(&mut self, event_loop: &mut EventLoop<Host>, _host: &HostInfo) -> Result<(), UtilError> {
|
||||
self.connection.writable(event_loop)
|
||||
}
|
||||
|
||||
/// Checks if peer supports given capability
|
||||
pub fn have_capability(&self, protocol: &str) -> bool {
|
||||
self.info.capabilities.iter().any(|c| c.protocol == protocol)
|
||||
}
|
||||
|
||||
/// Update registration with the event loop. Should be called at the end of the IO handler.
|
||||
pub fn reregister(&mut self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||
self.connection.reregister(event_loop)
|
||||
}
|
||||
|
||||
/// Send a protocol packet to peer.
|
||||
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
|
||||
let mut i = 0usize;
|
||||
while protocol != self.info.capabilities[i].protocol {
|
||||
@ -160,7 +192,7 @@ impl Session {
|
||||
|
||||
fn write_hello(&mut self, host: &HostInfo) -> Result<(), UtilError> {
|
||||
let mut rlp = RlpStream::new();
|
||||
rlp.append(&(PACKET_HELLO as u32));
|
||||
rlp.append_raw(&[PACKET_HELLO as u8], 0);
|
||||
rlp.append_list(5)
|
||||
.append(&host.protocol_version)
|
||||
.append(&host.client_version)
|
||||
@ -218,11 +250,11 @@ impl Session {
|
||||
}
|
||||
|
||||
fn write_ping(&mut self) -> Result<(), UtilError> {
|
||||
self.send(try!(Session::prepare(PACKET_PING, 0)))
|
||||
self.send(try!(Session::prepare(PACKET_PING)))
|
||||
}
|
||||
|
||||
fn write_pong(&mut self) -> Result<(), UtilError> {
|
||||
self.send(try!(Session::prepare(PACKET_PONG, 0)))
|
||||
self.send(try!(Session::prepare(PACKET_PONG)))
|
||||
}
|
||||
|
||||
fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
|
||||
@ -234,10 +266,10 @@ impl Session {
|
||||
NetworkError::Disconnect(reason)
|
||||
}
|
||||
|
||||
fn prepare(packet_id: u8, items: usize) -> Result<RlpStream, UtilError> {
|
||||
let mut rlp = RlpStream::new_list(1);
|
||||
fn prepare(packet_id: u8) -> Result<RlpStream, UtilError> {
|
||||
let mut rlp = RlpStream::new();
|
||||
rlp.append(&(packet_id as u32));
|
||||
rlp.append_list(items);
|
||||
rlp.append_list(0);
|
||||
Ok(rlp)
|
||||
}
|
||||
|
||||
|
@ -188,7 +188,7 @@ impl<'a, 'view> View<'a, 'view> for UntrustedRlp<'a> where 'a: 'view {
|
||||
}
|
||||
|
||||
fn val_at<T>(&self, index: usize) -> Result<T, DecoderError> where T: Decodable {
|
||||
self.at(index).unwrap().as_val()
|
||||
try!(self.at(index)).as_val()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ pub use std::io::{Read,Write};
|
||||
pub use std::hash::{Hash, Hasher};
|
||||
pub use std::error::Error as StdError;
|
||||
|
||||
pub use std::sync::*;
|
||||
pub use std::ops::*;
|
||||
pub use std::cmp::*;
|
||||
pub use std::cell::*;
|
||||
|
Loading…
Reference in New Issue
Block a user