diff --git a/Cargo.toml b/Cargo.toml index 85a1f35a6..0e584d8d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,12 @@ log = "0.3" env_logger = "0.3" rustc-serialize = "0.3" arrayvec = "0.3" -mio = "0.5.*" +mio = "0.5.0" rand = "0.3.12" time = "0.1.34" tiny-keccak = "1.0" rocksdb = "0.2.1" -lazy_static = "0.1.*" +lazy_static = "0.1.0" eth-secp256k1 = { git = "https://github.com/arkpar/rust-secp256k1.git" } rust-crypto = "0.2.34" elastic-array = "0.4" diff --git a/src/network/host.rs b/src/network/host.rs index 6c37bb1d3..68430c688 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -23,7 +23,7 @@ const MAX_USER_TIMERS: usize = 32; const IDEAL_PEERS:u32 = 10; pub type NodeId = H512; -type TimerFun = Fn(&mut HostIo) -> bool + Send; +pub type TimerToken = usize; #[derive(Debug)] struct NetworkConfiguration { @@ -146,7 +146,7 @@ const LAST_CONNECTION: usize = FIRST_CONNECTION + MAX_CONNECTIONS - 1; const USER_TIMER: usize = LAST_CONNECTION; const LAST_USER_TIMER: usize = USER_TIMER + MAX_USER_TIMERS - 1; -pub type PacketId = u32; +pub type PacketId = u8; pub type ProtocolId = &'static str; pub enum HostMessage { @@ -162,14 +162,9 @@ pub enum HostMessage { protocol: ProtocolId, data: Vec, }, - AddTimer { - handler: Box, - delay: u32, - protocol: ProtocolId, - } } -pub type PeerId = u32; +pub type PeerId = usize; #[derive(Debug, PartialEq, Eq)] pub struct CapabilityInfo { @@ -189,54 +184,67 @@ impl Encodable for CapabilityInfo { pub struct HostIo<'s> { protocol: ProtocolId, - session: Option<&'s mut Session>, + connections: &'s mut Slab, + timers: &'s mut Slab, + session: Option, event_loop: &'s mut EventLoop, - channel: &'s mut Sender } impl<'s> HostIo<'s> { - fn new(protocol: ProtocolId, session: Option<&'s mut Session>, event_loop: &'s mut EventLoop, channel: &'s mut Sender) -> HostIo<'s> { + fn new(protocol: ProtocolId, session: Option, event_loop: &'s mut EventLoop, connections: &'s mut Slab, timers: &'s mut Slab) -> HostIo<'s> { HostIo { protocol: protocol, session: session, event_loop: event_loop, - channel: channel, + connections: connections, + timers: timers, } } pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), Error> { - try!(self.channel.send(HostMessage::Send { - peer: peer, - packet_id: packet_id, - protocol: self.protocol, - data: data - })); + match self.connections.get_mut(Token(peer)) { + Some(&mut ConnectionEntry::Session(ref mut s)) => { + s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { + warn!(target: "net", "Send error: {:?}", e); + }); //TODO: don't copy vector data + }, + _ => { + warn!(target: "net", "Send: Peer does not exist"); + } + } Ok(()) } - pub fn respond(&mut self, packet_id: PacketId, data: &[u8]) -> Result<(), Error> { - match self.session.as_mut() { - Some(session) => session.send_packet(self.protocol, packet_id as u8, data), + pub fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), Error> { + match self.session { + Some(session) => self.send(session.as_usize(), packet_id, data), None => { panic!("Respond: Session does not exist") } } } - fn register_timer(&mut self, ms: u32, handler: Box) -> Result<(), Error>{ - try!(self.channel.send(HostMessage::AddTimer { - delay: ms, - handler: handler, - protocol: self.protocol, - })); - Ok(()) + pub fn register_timer(&mut self, ms: u64) -> Result{ + match self.timers.insert(UserTimer { + delay: ms, + protocol: self.protocol, + }) { + Ok(token) => { + self.event_loop.timeout_ms(token, ms).expect("Error registering user timer"); + Ok(token.as_usize()) + }, + _ => { panic!("Max timers reached") } + } + } + + pub fn disable_peer(&mut self, _peer: PeerId) { + //TODO: remove capability, disconnect if no capabilities left } } struct UserTimer { - handler: Box, protocol: ProtocolId, - delay: u32, + delay: u64, } pub struct HostInfo { @@ -492,6 +500,8 @@ impl Host { fn connection_readable(&mut self, token: Token, event_loop: &mut EventLoop) { let mut kill = false; let mut create_session = false; + let mut ready_data: Vec = Vec::new(); + let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; { match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { @@ -509,9 +519,9 @@ impl Host { }) }; match sd { SessionData::Ready => { - for (p, h) in self.handlers.iter_mut() { + for (p, _) in self.handlers.iter_mut() { if s.have_capability(p) { - h.connected(&mut HostIo::new(p, Some(s), event_loop, &mut self.channel), &(token.as_usize() as u32)); + ready_data.push(p); } } }, @@ -522,7 +532,7 @@ impl Host { } => { match self.handlers.get_mut(protocol) { None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, - Some(h) => h.read(&mut HostIo::new(protocol, Some(s), event_loop, &mut self.channel), packet_id, &data[1..]) + Some(_) => packet_data = Some((protocol, packet_id, data)), } }, SessionData::None => {}, @@ -539,6 +549,15 @@ impl Host { if create_session { self.start_session(token, event_loop); } + for p in ready_data { + let mut h = self.handlers.get_mut(p).unwrap(); + h.connected(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize()); + } + if let Some((p, packet_id, data)) = packet_data { + let mut h = self.handlers.get_mut(p).unwrap(); + h.read(&mut HostIo::new(p, Some(token), event_loop, &mut self.connections, &mut self.timers), &token.as_usize(), packet_id, &data[1..]); + } + } fn start_session(&mut self, token: Token, event_loop: &mut EventLoop) { @@ -593,9 +612,10 @@ impl Handler for Host { NODETABLE_DISCOVERY => {}, NODETABLE_MAINTAIN => {}, USER_TIMER ... LAST_USER_TIMER => { - let timer = self.timers.get_mut(token).expect("Unknown user timer token"); - if (*timer.handler)(&mut HostIo::new(timer.protocol, None, event_loop, &mut self.channel)) { - event_loop.timeout_ms(token, timer.delay as u64).expect("Unable to reregister user timer"); + let protocol = self.timers.get_mut(token).expect("Unknown user timer token").protocol; + match self.handlers.get_mut(protocol) { + None => { warn!(target: "net", "No handler found for protocol: {:?}", protocol) }, + Some(h) => h.timeout(&mut HostIo::new(protocol, None, event_loop, &mut self.connections, &mut self.timers), token.as_usize()), } } _ => panic!("Unknown timer token"), @@ -632,22 +652,6 @@ impl Handler for Host { } } }, - HostMessage::AddTimer { - handler, - delay, - protocol, - } => { - match self.timers.insert(UserTimer { - handler: handler, - delay: delay, - protocol: protocol, - }) { - Ok(token) => { - event_loop.timeout_ms(token, delay as u64).expect("Error registering user timer"); - }, - _ => { panic!("Max timers reached") } - } - } } } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 0b908a417..62928b020 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -47,6 +47,7 @@ impl From<::crypto::CryptoError> for Error { Error::Crypto(err) } } + impl From<::std::net::AddrParseError> for Error { fn from(err: ::std::net::AddrParseError) -> Error { Error::AddressParse(err) @@ -70,15 +71,19 @@ impl From<::mio::NotifyError> for Error { } pub type PeerId = host::PeerId; +pub type PacketId = host::PacketId; +pub type TimerToken = host::TimerToken; pub type HandlerIo<'s> = host::HostIo<'s>; pub trait ProtocolHandler: Send { fn initialize(&mut self, io: &mut HandlerIo); - fn read(&mut self, io: &mut HandlerIo, packet_id: u8, data: &[u8]); + fn read(&mut self, io: &mut HandlerIo, peer: &PeerId, packet_id: u8, data: &[u8]); fn connected(&mut self, io: &mut HandlerIo, peer: &PeerId); fn disconnected(&mut self, io: &mut HandlerIo, peer: &PeerId); + fn timeout(&mut self, io: &mut HandlerIo, timer: TimerToken); } pub struct NetworkClient; +pub type NetworkService = service::NetworkService;