diff --git a/Cargo.toml b/Cargo.toml index 6e32edcab..b7dca6e1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ rust-crypto = "0.2.34" elastic-array = "0.4" heapsize = "0.2" itertools = "0.4" +slab = { git = "https://github.com/arkpar/slab.git" } [dev-dependencies] json-tests = { path = "json-tests" } diff --git a/src/lib.rs b/src/lib.rs index 4b0108c6f..fca71a100 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ //! sudo make install //! ``` +extern crate slab; extern crate rustc_serialize; extern crate mio; extern crate rand; diff --git a/src/network/host.rs b/src/network/host.rs index abe1cbfcc..6b3dfbfd1 100644 --- a/src/network/host.rs +++ b/src/network/host.rs @@ -4,7 +4,6 @@ use std::collections::{HashMap}; use std::hash::{Hasher}; use std::str::{FromStr}; use mio::*; -use mio::util::{Slab}; use mio::tcp::*; use mio::udp::*; use hash::*; @@ -17,6 +16,9 @@ use error::*; use io::*; use network::NetworkProtocolHandler; use network::node::*; +use slab::Index; + +type Slab = ::slab::Slab; const _DEFAULT_PORT: u16 = 30304; @@ -128,7 +130,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's /// Send a packet over the network to another peer. pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec) -> Result<(), UtilError> { - match self.connections.get_mut(Token(peer)) { + match self.connections.get_mut(peer) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); @@ -169,7 +171,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's /// Returns peer identification string pub fn peer_info(&self, peer: PeerId) -> String { - match self.connections.get(Token(peer)) { + match self.connections.get(peer) { Some(&ConnectionEntry::Session(ref s)) => { s.info.client_version.clone() }, @@ -251,7 +253,7 @@ impl Host where Message: Send { }, udp_socket: udp_socket, listener: listener, - connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS), + connections: Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS), timers: HashMap::new(), nodes: HashMap::new(), handlers: HashMap::new(), @@ -353,8 +355,9 @@ impl Host where Message: Send { }; let nonce = self.info.next_nonce(); - match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { + match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) { Some(token) => { + warn!(target: "slab", "inserted {}", token.as_usize()); match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.start(&self.info, true) @@ -378,7 +381,7 @@ impl Host where Message: Send { fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut kill = false; let mut create_session = false; - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.writable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake write error: {:?}", e); @@ -402,7 +405,7 @@ impl Host where Message: Send { } else if create_session { self.start_session(token, io); } - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, @@ -419,7 +422,7 @@ impl Host where Message: Send { let mut create_session = false; let mut ready_data: Vec = Vec::new(); let mut packet_data: Option<(ProtocolId, PacketId, Vec)> = None; - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(ref mut h)) => { h.readable(io.event_loop, &self.info).unwrap_or_else(|e| { debug!(target: "net", "Handshake read error: {:?}", e); @@ -474,7 +477,7 @@ impl Host where Message: Send { h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]); } - match self.connections.get_mut(Token(token)) { + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e)); }, @@ -485,11 +488,12 @@ impl Host where Message: Send { fn start_session(&mut self, token: StreamToken, io: &mut IoContext>) { let info = &self.info; // TODO: use slab::replace_with (currently broken) - match self.connections.remove(Token(token)) { + /* + match self.connections.remove(token) { Some(ConnectionEntry::Handshake(h)) => { match Session::new(h, io.event_loop, info) { Ok(session) => { - assert!(Token(token) == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); + assert!(token == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap()); }, Err(e) => { debug!(target: "net", "Session construction error: {:?}", e); @@ -497,9 +501,9 @@ impl Host where Message: Send { } }, _ => panic!("Error updating slab with session") - } - /* - self.connections.replace_with(Token(token), |c| { + }*/ + warn!(target: "slab", "replaced {}", token.as_usize()); + self.connections.replace_with(token, |c| { match c { ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) .map(|s| Some(ConnectionEntry::Session(s))) @@ -510,7 +514,6 @@ impl Host where Message: Send { _ => { panic!("No handshake to create a session from"); } } }).expect("Error updating slab with session"); - */ } fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { @@ -519,7 +522,8 @@ impl Host where Message: Send { fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage>) { let mut to_disconnect: Vec = Vec::new(); - match self.connections.get_mut(Token(token)) { + let mut remove = true; + match self.connections.get_mut(token) { Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => { for (p, _) in self.handlers.iter_mut() { @@ -528,13 +532,18 @@ impl Host where Message: Send { } } }, - _ => (), + _ => { + remove = false; + }, } for p in to_disconnect { let mut h = self.handlers.get_mut(p).unwrap(); h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); } - self.connections.remove(Token(token)); + if remove { + self.connections.remove(token); + warn!(target: "slab", "removed {}", token); + } } } @@ -626,7 +635,7 @@ impl IoHandler> for Host where Messa ref protocol, ref data, } => { - match self.connections.get_mut(Token(*peer as usize)) { + match self.connections.get_mut(*peer as usize) { Some(&mut ConnectionEntry::Session(ref mut s)) => { s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| { warn!(target: "net", "Send error: {:?}", e); diff --git a/src/network/session.rs b/src/network/session.rs index 2d8ba2245..828e4b062 100644 --- a/src/network/session.rs +++ b/src/network/session.rs @@ -242,6 +242,7 @@ impl Session { i += 1; } trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps); + self.info.client_version = client_version; self.info.capabilities = caps; if protocol != host.protocol_version { return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));