diff --git a/Cargo.toml b/Cargo.toml index d836c093a..fe56748d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ log = "0.3" env_logger = "0.3" rustc-serialize = "0.3" arrayvec = "0.3" -mio = "0.4.4" +mio = "0.5.*" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" diff --git a/src/network/connection.rs b/src/network/connection.rs index 93a92daa7..65460f1bd 100644 --- a/src/network/connection.rs +++ b/src/network/connection.rs @@ -120,7 +120,7 @@ impl Connection { pub fn register(&mut self, event_loop: &mut EventLoop) -> io::Result<()> { trace!(target: "net", "connection register; token={:?}", self.token); self.interest.insert(EventSet::readable()); - event_loop.register_opt(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { + event_loop.register(&self.socket, self.token, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| { error!("Failed to reregister {:?}, {:?}", self.token, e); Err(e) }) diff --git a/src/network/handshake.rs b/src/network/handshake.rs index 788b7c1f7..db1511cd0 100644 --- a/src/network/handshake.rs +++ b/src/network/handshake.rs @@ -4,17 +4,15 @@ use hash::*; use bytes::Bytes; use crypto::*; use crypto; -use network::connection::{Connection, WriteStatus}; +use network::connection::{Connection}; use network::host::{NodeId, Host, HostInfo}; use network::Error; -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Debug)] enum HandshakeState { New, ReadingAuth, - WritingAuth, ReadingAck, - WritingAck, StartSession, } @@ -89,7 +87,7 @@ impl Handshake { None => {} }; }, - _ => { panic!("Unexpected state") } + _ => { panic!("Unexpected state"); } } if self.state != HandshakeState::StartSession { try!(self.connection.reregister(event_loop)); @@ -99,27 +97,7 @@ impl Handshake { pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { self.idle_timeout.map(|t| event_loop.clear_timeout(t)); - match self.state { - HandshakeState::WritingAuth => { - match try!(self.connection.writable()) { - WriteStatus::Complete => { - self.connection.expect(ACK_PACKET_SIZE); - self.state = HandshakeState::ReadingAck; - }, - _ => {} - }; - }, - HandshakeState::WritingAck => { - match try!(self.connection.writable()) { - WriteStatus::Complete => { - self.connection.expect(32); - self.state = HandshakeState::StartSession; - }, - _ => {} - }; - }, - _ => { panic!("Unexpected state") } - } + try!(self.connection.writable()); if self.state != HandshakeState::StartSession { try!(self.connection.reregister(event_loop)); } @@ -185,7 +163,8 @@ impl Handshake { let message = try!(crypto::ecies::encrypt(&self.id, &data)); self.auth_cipher = message.clone(); self.connection.send(message); - self.state = HandshakeState::WritingAuth; + self.connection.expect(ACK_PACKET_SIZE); + self.state = HandshakeState::ReadingAck; Ok(()) } @@ -203,7 +182,7 @@ impl Handshake { let message = try!(crypto::ecies::encrypt(&self.id, &data)); self.ack_cipher = message.clone(); self.connection.send(message); - self.state = HandshakeState::WritingAck; + self.state = HandshakeState::StartSession; Ok(()) } } diff --git a/src/network/host.rs b/src/network/host.rs index e35314d88..6c37bb1d3 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -13,15 +13,17 @@ use crypto::*; use rlp::*; use time::Tm; use network::handshake::Handshake; -use network::session::Session; -use network::Error; +use network::session::{Session, SessionData}; +use network::{Error, ProtocolHandler}; const DEFAULT_PORT: u16 = 30304; const MAX_CONNECTIONS: usize = 1024; +const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS:u32 = 10; pub type NodeId = H512; +type TimerFun = Fn(&mut HostIo) -> bool + Send; #[derive(Debug)] struct NetworkConfiguration { @@ -141,33 +143,100 @@ const NODETABLE_MAINTAIN: usize = 5; const NODETABLE_DISCOVERY: usize = 6; const FIRST_CONNECTION: usize = 7; const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; +const USER_TIMER: usize = LAST_CONNECTION; +const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; + +pub type PacketId = u32; +pub type ProtocolId = &'static str; pub enum HostMessage { - Shutdown + Shutdown, + AddHandler { + handler: Box, + protocol: ProtocolId, + versions: Vec, + }, + Send { + peer: PeerId, + packet_id: PacketId, + protocol: ProtocolId, + data: Vec, + }, + AddTimer { + handler: Box, + delay: u32, + protocol: ProtocolId, + } } +pub type PeerId = u32; + #[derive(Debug, PartialEq, Eq)] pub struct CapabilityInfo { - pub protocol: String, - pub version: u32, + pub protocol: ProtocolId, + pub version: u8, + pub packet_count: u8, } impl Encodable for CapabilityInfo { fn encode(&self, encoder: &mut E) -> () where E: Encoder { encoder.emit_list(|e| { self.protocol.encode(e); - self.version.encode(e); + (self.version as u32).encode(e); }); } } -impl Decodable for CapabilityInfo { - fn decode_untrusted(rlp: &UntrustedRlp) -> Result { - Ok(CapabilityInfo { - protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))), - version: try!(u32::decode_untrusted(&try!(rlp.at(1)))), - }) +pub struct HostIo<'s> { + protocol: ProtocolId, + session: Option<&'s mut Session>, + event_loop: &'s mut EventLoop, + channel: &'s mut Sender +} + +impl<'s> HostIo<'s> { + fn new(protocol: ProtocolId, session: Option<&'s mut Session>, event_loop: &'s mut EventLoop, channel: &'s mut Sender) -> HostIo<'s> { + HostIo { + protocol: protocol, + session: session, + event_loop: event_loop, + channel: channel, + } } + + pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> { + try!(self.channel.send(HostMessage::Send { + peer: peer, + packet_id: packet_id, + protocol: self.protocol, + data: data + })); + Ok(()) + } + + pub fn respond(&mut self, packet_id: PacketId, data: &[u8]) -> Result<(), Error> { + match self.session.as_mut() { + Some(session) => session.send_packet(self.protocol, packet_id as u8, data), + None => { + panic!("Respond: Session does not exist") + } + } + } + + fn register_timer(&mut self, ms: u32, handler: Box) -> Result<(), Error>{ + try!(self.channel.send(HostMessage::AddTimer { + delay: ms, + handler: handler, + protocol: self.protocol, + })); + Ok(()) + } +} + +struct UserTimer { + handler: Box, + protocol: ProtocolId, + delay: u32, } pub struct HostInfo { @@ -201,16 +270,18 @@ enum ConnectionEntry { pub struct Host { info: HostInfo, - sender: Sender, udp_socket: UdpSocket, listener: TcpListener, connections: Slab, + timers: Slab, nodes: HashMap, + handlers: HashMap>, idle_timeout: Timeout, + channel: Sender, } impl Host { - pub fn start() { + pub fn start(event_loop: &mut EventLoop) -> Result<(), Error> { let config = NetworkConfiguration::new(); /* match ::ifaces::Interface::get_all().unwrap().into_iter().filter(|x| x.kind == ::ifaces::Kind::Packet && x.addr.is_some()).next() { @@ -222,19 +293,16 @@ impl Host { let addr = config.listen_address; // Setup the server socket let listener = TcpListener::bind(&addr).unwrap(); - // Create an event loop - let mut event_loop = EventLoop::new().unwrap(); - let sender = event_loop.channel(); // Start listening for incoming connections - event_loop.register_opt(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); + event_loop.register(&listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); // Setup the client socket //let sock = TcpStream::connect(&addr).unwrap(); // Register the socket - //self.event_loop.register_opt(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap(); + //self.event_loop.register(&sock, CLIENT, EventSet::readable(), PollOpt::edge()).unwrap(); let idle_timeout = event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay // open the udp socket let udp_socket = UdpSocket::bound(&addr).unwrap(); - event_loop.register_opt(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); + event_loop.register(&udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); let port = config.listen_address.port(); @@ -246,14 +314,17 @@ impl Host { protocol_version: 4, client_version: "parity".to_string(), listen_port: port, - capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], + //capabilities: vec![ CapabilityInfo { protocol: "eth".to_string(), version: 63 }], + capabilities: Vec::new(), }, - sender: sender, udp_socket: udp_socket, listener: listener, connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), + timers: Slab::new_starting_at(Token(USER_TIMER), MAX_USER_TIMERS), nodes: HashMap::new(), - idle_timeout: idle_timeout + handlers: HashMap::new(), + idle_timeout: idle_timeout, + channel: event_loop.channel(), }; host.add_node("enode://c022e7a27affdd1632f2e67dffeb87f02bf506344bb142e08d12b28e7e5c6e5dbb8183a46a77bff3631b51c12e8cf15199f797feafdc8834aaf078ad1a2bcfa0@127.0.0.1:30303"); @@ -263,14 +334,8 @@ impl Host { host.add_node("enode://7f25d3eab333a6b98a8b5ed68d962bb22c876ffcd5561fca54e3c2ef27f754df6f7fd7c9b74cc919067abac154fb8e1f8385505954f161ae440abc355855e034@54.207.93.166:30303"); host.add_node("enode://5374c1bff8df923d3706357eeb4983cd29a63be40a269aaa2296ee5f3b2119a8978c0ed68b8f6fc84aad0df18790417daadf91a4bfbb786a16c9b0a199fa254a@92.51.165.126:30303"); - event_loop.run(&mut host).unwrap(); - } - - fn stop(&mut self) { - } - - fn have_network(&mut self) -> bool { - true + try!(event_loop.run(&mut host)); + Ok(()) } fn add_node(&mut self, id: &str) { @@ -422,6 +487,8 @@ impl Host { self.start_session(token, event_loop); } } + + fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { let mut kill = false; let mut create_session = false; @@ -435,10 +502,31 @@ impl Host { create_session = h.done(); }, Some(&mut ConnectionEntry::Session(ref mut s)) => { - s.readable(event_loop, &self.info).unwrap_or_else(|e| { + let sd = { s.readable(event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Session read error: {:?}", e); kill = true; - }); + SessionData::None + }) }; + match sd { + SessionData::Ready => { + for (p, h) in self.handlers.iter_mut() { + if s.have_capability(p) { + h.connected(&mut HostIo::new(p, Some(s), event_loop, &mut self.channel), &(token.as_usize() as u32)); + } + } + }, + SessionData::Packet { + data, + protocol, + packet_id, + } => { + match self.handlers.get_mut(protocol) { + None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, + Some(h) => h.read(&mut HostIo::new(protocol, Some(s), event_loop, &mut self.channel), packet_id, &data[1..]) + } + }, + SessionData::None => {}, + } } _ => { warn!(target: "net", "Received event for unknown connection"); @@ -504,7 +592,62 @@ impl Handler for Host { FIRST_CONNECTION ... LAST_CONNECTION => self.connection_timeout(token, event_loop), NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, - _ => panic!("Received unknown timer token"), + USER_TIMER ... LAST_USER_TIMER => { + let timer = self.timers.get_mut(token).expect("Unknown user timer token"); + if (*timer.handler)(&mut HostIo::new(timer.protocol, None, event_loop, &mut self.channel)) { + event_loop.timeout_ms(token, timer.delay as u64).expect("Unable to reregister user timer"); + } + } + _ => panic!("Unknown timer token"), + } + } + + fn notify(&mut self, event_loop: &mut EventLoop, msg: Self::Message) { + match msg { + HostMessage::Shutdown => event_loop.shutdown(), + HostMessage::AddHandler { + handler, + protocol, + versions + } => { + self.handlers.insert(protocol, handler); + for v in versions { + self.info.capabilities.push(CapabilityInfo { protocol: protocol, version: v, packet_count:0 }); + } + }, + HostMessage::Send { + peer, + packet_id, + protocol, + data, + } => { + match self.connections.get_mut(Token(peer as usize)) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.send_packet(protocol, packet_id as u8, &data).unwrap_or_else(|e| { + warn!(target: "net", "Send error: {:?}", e); + }); //TODO: don't copy vector data + }, + _ => { + warn!(target: "net", "Send: Peer does not exist"); + } + } + }, + HostMessage::AddTimer { + handler, + delay, + protocol, + } => { + match self.timers.insert(UserTimer { + handler: handler, + delay: delay, + protocol: protocol, + }) { + Ok(token) => { + event_loop.timeout_ms(token, delay as u64).expect("Error registering user timer"); + }, + _ => { panic!("Max timers reached") } + } + } } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 1199ab967..0b908a417 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -4,6 +4,7 @@ mod connection; mod handshake; mod session; mod discovery; +mod service; #[derive(Debug, Copy, Clone)] pub enum DisconnectReason @@ -31,6 +32,7 @@ pub enum Error { AddressParse(::std::net::AddrParseError), AddressResolve(Option<::std::io::Error>), NodeIdParse(::error::EthcoreError), + PeerNotFound, Disconnect(DisconnectReason) } @@ -61,7 +63,22 @@ impl From<::rlp::DecoderError> for Error { } } -pub fn start_host() -{ - let _ = host::Host::start(); +impl From<::mio::NotifyError> for Error { + fn from(_err: ::mio::NotifyError) -> Error { + Error::Io(::std::io::Error::new(::std::io::ErrorKind::ConnectionAborted, "Network IO notification error")) + } } + +pub type PeerId = host::PeerId; +pub type HandlerIo<'s> = host::HostIo<'s>; + +pub trait ProtocolHandler: Send { + fn initialize(&mut self, io: &mut HandlerIo); + fn read(&mut self, io: &mut HandlerIo, packet_id: u8, data: &[u8]); + fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); + fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); +} + +pub struct NetworkClient; + + diff --git a/src/network/service.rs b/src/network/service.rs new file mode 100644 index 000000000..1e245c20b --- /dev/null +++ b/src/network/service.rs @@ -0,0 +1,52 @@ +#![allow(dead_code)] //TODO: remove this after everything is done + +use std::thread::{self, JoinHandle}; +use mio::*; +use network::{Error, ProtocolHandler}; +use network::host::{Host, HostMessage, PeerId, PacketId, ProtocolId}; + +pub struct NetworkService { + thread: Option>, + host_channel: Sender +} + +impl NetworkService { + pub fn start() -> Result { + let mut event_loop = EventLoop::new().unwrap(); + let channel = event_loop.channel(); + let thread = thread::spawn(move || { + Host::start(&mut event_loop).unwrap(); //TODO: + }); + Ok(NetworkService { + thread: Some(thread), + host_channel: channel + }) + } + + pub fn send(&mut self, peer: &PeerId, packet_id: PacketId, protocol: ProtocolId, data: &[u8]) -> Result<(), Error> { + try!(self.host_channel.send(HostMessage::Send { + peer: *peer, + packet_id: packet_id, + protocol: protocol, + data: data.to_vec() + })); + Ok(()) + } + + pub fn register_protocol(&mut self, handler: Box, protocol: ProtocolId, versions: &[u8]) -> Result<(), Error> { + try!(self.host_channel.send(HostMessage::AddHandler { + handler: handler, + protocol: protocol, + versions: versions.to_vec(), + })); + Ok(()) + } +} + +impl Drop for NetworkService { + fn drop(&mut self) { + self.host_channel.send(HostMessage::Shutdown).unwrap(); + self.thread.take().unwrap().join().unwrap(); + } +} + diff --git a/src/network/session.rs b/src/network/session.rs index eb6ca6ced..698f8f6ac 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -14,11 +14,44 @@ pub struct Session { had_hello: bool, } +pub enum SessionData { + None, + Ready, + Packet { + data: Vec, + protocol: &'static str, + packet_id: u8, + }, +} + pub struct SessionInfo { pub id: NodeId, pub client_version: String, pub protocol_version: u32, - pub capabilities: Vec, + pub capabilities: Vec, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct PeerCapabilityInfo { + pub protocol: String, + pub version: u8, +} + +impl Decodable for PeerCapabilityInfo { + fn decode_untrusted(rlp: &UntrustedRlp) -> Result { + Ok(PeerCapabilityInfo { + protocol: try!(String::decode_untrusted(&try!(rlp.at(0)))), + version: try!(u32::decode_untrusted(&try!(rlp.at(1)))) as u8, + }) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct SessionCapabilityInfo { + pub protocol: &'static str, + pub version: u8, + pub packet_count: u8, + pub id_offset: u8, } const PACKET_HELLO: u8 = 0x80; @@ -50,43 +83,76 @@ impl Session { Ok(session) } - pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result<(), Error> { + pub fn readable(&mut self, event_loop: &mut EventLoop, host: &HostInfo) -> Result { match try!(self.connection.readable(event_loop)) { - Some(data) => { - try!(self.read_packet(data, host)); - }, - None => {} - }; - Ok(()) + Some(data) => self.read_packet(data, host), + None => Ok(SessionData::None) + } } pub fn writable(&mut self, event_loop: &mut EventLoop, _host: &HostInfo) -> Result<(), Error> { self.connection.writable(event_loop) } - pub fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<(), Error> { - let data = &packet.data; - if data.len() < 2 { + pub fn have_capability(&self, protocol: &str) -> bool { + self.info.capabilities.iter().any(|c| c.protocol == protocol) + } + + pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), Error> { + let mut i = 0usize; + while protocol != self.info.capabilities[i].protocol { + i += 1; + if i == self.info.capabilities.len() { + debug!(target: "net", "Unkown protocol: {:?}", protocol); + return Ok(()) + } + } + let pid = self.info.capabilities[i].id_offset + packet_id; + let mut rlp = RlpStream::new(); + rlp.append(&(pid as u32)); + rlp.append_raw(data, 1); + self.connection.send_packet(&rlp.out()) + } + + fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result { + if packet.data.len() < 2 { return Err(Error::BadProtocol); } - let packet_id = data[0]; - let rlp = UntrustedRlp::new(&data[1..]); //TODO: validate rlp expected size + let packet_id = packet.data[0]; if packet_id != PACKET_HELLO && packet_id != PACKET_DISCONNECT && !self.had_hello { return Err(Error::BadProtocol); } match packet_id { - PACKET_HELLO => self.read_hello(&rlp, host), + PACKET_HELLO => { + let rlp = UntrustedRlp::new(&packet.data[1..]); //TODO: validate rlp expected size + try!(self.read_hello(&rlp, host)); + Ok(SessionData::Ready) + } PACKET_DISCONNECT => Err(Error::Disconnect(DisconnectReason::DisconnectRequested)), - PACKET_PING => self.write_pong(), - PACKET_GET_PEERS => Ok(()), //TODO; - PACKET_PEERS => Ok(()), + PACKET_PING => { + try!(self.write_pong()); + Ok(SessionData::None) + } + PACKET_GET_PEERS => Ok(SessionData::None), //TODO; + PACKET_PEERS => Ok(SessionData::None), PACKET_USER ... PACKET_LAST => { - warn!(target: "net", "User packet: {:?}", rlp); - Ok(()) + let mut i = 0usize; + while packet_id < self.info.capabilities[i].id_offset { + i += 1; + if i == self.info.capabilities.len() { + debug!(target: "net", "Unkown packet: {:?}", packet_id); + return Ok(SessionData::None) + } + } + + // map to protocol + let protocol = self.info.capabilities[i].protocol; + let pid = packet_id - self.info.capabilities[i].id_offset; + return Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } ) }, _ => { - debug!(target: "net", "Unkown packet: {:?}", rlp); - Ok(()) + debug!(target: "net", "Unkown packet: {:?}", packet_id); + Ok(SessionData::None) } } } @@ -106,12 +172,24 @@ impl Session { fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), Error> { let protocol = try!(u32::decode_untrusted(&try!(rlp.at(0)))); let client_version = try!(String::decode_untrusted(&try!(rlp.at(1)))); - let mut caps: Vec = try!(Decodable::decode_untrusted(&try!(rlp.at(2)))); + let peer_caps: Vec = try!(Decodable::decode_untrusted(&try!(rlp.at(2)))); let id = try!(NodeId::decode_untrusted(&try!(rlp.at(4)))); // Intersect with host capabilities // Leave only highset mutually supported capability version - caps.retain(|c| host.capabilities.contains(&c)); + let mut caps: Vec = Vec::new(); + for hc in host.capabilities.iter() { + if peer_caps.iter().any(|c| c.protocol == hc.protocol && c.version == hc.version) { + caps.push(SessionCapabilityInfo { + protocol: hc.protocol, + version: hc.version, + id_offset: 0, + packet_count: hc.packet_count, + }); + } + } + + caps.retain(|c| host.capabilities.iter().any(|hc| hc.protocol == c.protocol && hc.version == c.version)); let mut i = 0; while i < caps.len() { if caps.iter().any(|c| c.protocol == caps[i].protocol && c.version > caps[i].version) { @@ -122,7 +200,15 @@ impl Session { } } + i = 0; + let mut offset: u8 = PACKET_USER; + while i < caps.len() { + caps[i].id_offset = offset; + offset += caps[i].packet_count; + i += 1; + } trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + self.info.capabilities = caps; if protocol != host.protocol_version { return Err(self.disconnect(DisconnectReason::UselessPeer)); }