Slab bug workaround
This commit is contained in:
parent
3b557cdc07
commit
2d36062794
@ -22,6 +22,7 @@ rust-crypto = "0.2.34"
|
|||||||
elastic-array = "0.4"
|
elastic-array = "0.4"
|
||||||
heapsize = "0.2"
|
heapsize = "0.2"
|
||||||
itertools = "0.4"
|
itertools = "0.4"
|
||||||
|
slab = { git = "https://github.com/arkpar/slab.git" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
json-tests = { path = "json-tests" }
|
json-tests = { path = "json-tests" }
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
//! sudo make install
|
//! sudo make install
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
|
extern crate slab;
|
||||||
extern crate rustc_serialize;
|
extern crate rustc_serialize;
|
||||||
extern crate mio;
|
extern crate mio;
|
||||||
extern crate rand;
|
extern crate rand;
|
||||||
|
@ -4,7 +4,6 @@ use std::collections::{HashMap};
|
|||||||
use std::hash::{Hasher};
|
use std::hash::{Hasher};
|
||||||
use std::str::{FromStr};
|
use std::str::{FromStr};
|
||||||
use mio::*;
|
use mio::*;
|
||||||
use mio::util::{Slab};
|
|
||||||
use mio::tcp::*;
|
use mio::tcp::*;
|
||||||
use mio::udp::*;
|
use mio::udp::*;
|
||||||
use hash::*;
|
use hash::*;
|
||||||
@ -17,6 +16,9 @@ use error::*;
|
|||||||
use io::*;
|
use io::*;
|
||||||
use network::NetworkProtocolHandler;
|
use network::NetworkProtocolHandler;
|
||||||
use network::node::*;
|
use network::node::*;
|
||||||
|
use slab::Index;
|
||||||
|
|
||||||
|
type Slab<T> = ::slab::Slab<T, usize>;
|
||||||
|
|
||||||
const _DEFAULT_PORT: u16 = 30304;
|
const _DEFAULT_PORT: u16 = 30304;
|
||||||
|
|
||||||
@ -128,7 +130,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's
|
|||||||
|
|
||||||
/// Send a packet over the network to another peer.
|
/// Send a packet over the network to another peer.
|
||||||
pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
pub fn send(&mut self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
|
||||||
match self.connections.get_mut(Token(peer)) {
|
match self.connections.get_mut(peer) {
|
||||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||||
s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
|
s.send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
|
||||||
warn!(target: "net", "Send error: {:?}", e);
|
warn!(target: "net", "Send error: {:?}", e);
|
||||||
@ -169,7 +171,7 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's
|
|||||||
|
|
||||||
/// Returns peer identification string
|
/// Returns peer identification string
|
||||||
pub fn peer_info(&self, peer: PeerId) -> String {
|
pub fn peer_info(&self, peer: PeerId) -> String {
|
||||||
match self.connections.get(Token(peer)) {
|
match self.connections.get(peer) {
|
||||||
Some(&ConnectionEntry::Session(ref s)) => {
|
Some(&ConnectionEntry::Session(ref s)) => {
|
||||||
s.info.client_version.clone()
|
s.info.client_version.clone()
|
||||||
},
|
},
|
||||||
@ -251,7 +253,7 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
},
|
},
|
||||||
udp_socket: udp_socket,
|
udp_socket: udp_socket,
|
||||||
listener: listener,
|
listener: listener,
|
||||||
connections: Slab::new_starting_at(Token(FIRST_CONNECTION), MAX_CONNECTIONS),
|
connections: Slab::new_starting_at(FIRST_CONNECTION, MAX_CONNECTIONS),
|
||||||
timers: HashMap::new(),
|
timers: HashMap::new(),
|
||||||
nodes: HashMap::new(),
|
nodes: HashMap::new(),
|
||||||
handlers: HashMap::new(),
|
handlers: HashMap::new(),
|
||||||
@ -353,8 +355,9 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let nonce = self.info.next_nonce();
|
let nonce = self.info.next_nonce();
|
||||||
match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) {
|
match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(Token(token), id, socket, &nonce).expect("Can't create handshake"))) {
|
||||||
Some(token) => {
|
Some(token) => {
|
||||||
|
warn!(target: "slab", "inserted {}", token.as_usize());
|
||||||
match self.connections.get_mut(token) {
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
||||||
h.start(&self.info, true)
|
h.start(&self.info, true)
|
||||||
@ -378,7 +381,7 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
fn connection_writable<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
||||||
let mut kill = false;
|
let mut kill = false;
|
||||||
let mut create_session = false;
|
let mut create_session = false;
|
||||||
match self.connections.get_mut(Token(token)) {
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
||||||
h.writable(io.event_loop, &self.info).unwrap_or_else(|e| {
|
h.writable(io.event_loop, &self.info).unwrap_or_else(|e| {
|
||||||
debug!(target: "net", "Handshake write error: {:?}", e);
|
debug!(target: "net", "Handshake write error: {:?}", e);
|
||||||
@ -402,7 +405,7 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
} else if create_session {
|
} else if create_session {
|
||||||
self.start_session(token, io);
|
self.start_session(token, io);
|
||||||
}
|
}
|
||||||
match self.connections.get_mut(Token(token)) {
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||||
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
||||||
},
|
},
|
||||||
@ -419,7 +422,7 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
let mut create_session = false;
|
let mut create_session = false;
|
||||||
let mut ready_data: Vec<ProtocolId> = Vec::new();
|
let mut ready_data: Vec<ProtocolId> = Vec::new();
|
||||||
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
|
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
|
||||||
match self.connections.get_mut(Token(token)) {
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
|
||||||
h.readable(io.event_loop, &self.info).unwrap_or_else(|e| {
|
h.readable(io.event_loop, &self.info).unwrap_or_else(|e| {
|
||||||
debug!(target: "net", "Handshake read error: {:?}", e);
|
debug!(target: "net", "Handshake read error: {:?}", e);
|
||||||
@ -474,7 +477,7 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]);
|
h.read(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token, packet_id, &data[1..]);
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.connections.get_mut(Token(token)) {
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||||
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
s.reregister(io.event_loop).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
|
||||||
},
|
},
|
||||||
@ -485,11 +488,12 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
fn start_session(&mut self, token: StreamToken, io: &mut IoContext<NetworkIoMessage<Message>>) {
|
fn start_session(&mut self, token: StreamToken, io: &mut IoContext<NetworkIoMessage<Message>>) {
|
||||||
let info = &self.info;
|
let info = &self.info;
|
||||||
// TODO: use slab::replace_with (currently broken)
|
// TODO: use slab::replace_with (currently broken)
|
||||||
match self.connections.remove(Token(token)) {
|
/*
|
||||||
|
match self.connections.remove(token) {
|
||||||
Some(ConnectionEntry::Handshake(h)) => {
|
Some(ConnectionEntry::Handshake(h)) => {
|
||||||
match Session::new(h, io.event_loop, info) {
|
match Session::new(h, io.event_loop, info) {
|
||||||
Ok(session) => {
|
Ok(session) => {
|
||||||
assert!(Token(token) == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap());
|
assert!(token == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap());
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(target: "net", "Session construction error: {:?}", e);
|
debug!(target: "net", "Session construction error: {:?}", e);
|
||||||
@ -497,9 +501,9 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => panic!("Error updating slab with session")
|
_ => panic!("Error updating slab with session")
|
||||||
}
|
}*/
|
||||||
/*
|
warn!(target: "slab", "replaced {}", token.as_usize());
|
||||||
self.connections.replace_with(Token(token), |c| {
|
self.connections.replace_with(token, |c| {
|
||||||
match c {
|
match c {
|
||||||
ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info)
|
ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info)
|
||||||
.map(|s| Some(ConnectionEntry::Session(s)))
|
.map(|s| Some(ConnectionEntry::Session(s)))
|
||||||
@ -510,7 +514,6 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
_ => { panic!("No handshake to create a session from"); }
|
_ => { panic!("No handshake to create a session from"); }
|
||||||
}
|
}
|
||||||
}).expect("Error updating slab with session");
|
}).expect("Error updating slab with session");
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
||||||
@ -519,7 +522,8 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
|
|
||||||
fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
fn kill_connection<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
|
||||||
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
|
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
|
||||||
match self.connections.get_mut(Token(token)) {
|
let mut remove = true;
|
||||||
|
match self.connections.get_mut(token) {
|
||||||
Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake
|
Some(&mut ConnectionEntry::Handshake(_)) => (), // just abandon handshake
|
||||||
Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => {
|
Some(&mut ConnectionEntry::Session(ref mut s)) if s.is_ready() => {
|
||||||
for (p, _) in self.handlers.iter_mut() {
|
for (p, _) in self.handlers.iter_mut() {
|
||||||
@ -528,13 +532,18 @@ impl<Message> Host<Message> where Message: Send {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => {
|
||||||
|
remove = false;
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for p in to_disconnect {
|
for p in to_disconnect {
|
||||||
let mut h = self.handlers.get_mut(p).unwrap();
|
let mut h = self.handlers.get_mut(p).unwrap();
|
||||||
h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token);
|
h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token);
|
||||||
}
|
}
|
||||||
self.connections.remove(Token(token));
|
if remove {
|
||||||
|
self.connections.remove(token);
|
||||||
|
warn!(target: "slab", "removed {}", token);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -626,7 +635,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
ref protocol,
|
ref protocol,
|
||||||
ref data,
|
ref data,
|
||||||
} => {
|
} => {
|
||||||
match self.connections.get_mut(Token(*peer as usize)) {
|
match self.connections.get_mut(*peer as usize) {
|
||||||
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
Some(&mut ConnectionEntry::Session(ref mut s)) => {
|
||||||
s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| {
|
s.send_packet(protocol, *packet_id as u8, &data).unwrap_or_else(|e| {
|
||||||
warn!(target: "net", "Send error: {:?}", e);
|
warn!(target: "net", "Send error: {:?}", e);
|
||||||
|
@ -242,6 +242,7 @@ impl Session {
|
|||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
|
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
|
||||||
|
self.info.client_version = client_version;
|
||||||
self.info.capabilities = caps;
|
self.info.capabilities = caps;
|
||||||
if protocol != host.protocol_version {
|
if protocol != host.protocol_version {
|
||||||
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));
|
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));
|
||||||
|
Loading…
Reference in New Issue
Block a user