Minor fixes

This commit is contained in:
arkpar 2016-01-14 16:52:10 +01:00
parent 2ceffb425c
commit a2f13e1efb
2 changed files with 35 additions and 1 deletions

View File

@ -136,6 +136,7 @@ impl<Message> Handler for IoManager<Message> where Message: Send + 'static {
handler, handler,
} => { } => {
self.handlers.push(handler); self.handlers.push(handler);
self.handlers.last_mut().unwrap().initialize(&mut IoContext::new(event_loop, &mut self.timers));
}, },
IoMessage::UserMessage(ref mut data) => { IoMessage::UserMessage(ref mut data) => {
for h in self.handlers.iter_mut() { for h in self.handlers.iter_mut() {

View File

@ -23,6 +23,8 @@ const _DEFAULT_PORT: u16 = 30304;
const MAX_CONNECTIONS: usize = 1024; const MAX_CONNECTIONS: usize = 1024;
const IDEAL_PEERS: u32 = 10; const IDEAL_PEERS: u32 = 10;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)] #[derive(Debug)]
struct NetworkConfiguration { struct NetworkConfiguration {
listen_address: SocketAddr, listen_address: SocketAddr,
@ -164,6 +166,18 @@ impl<'s, 'io, Message> NetworkContext<'s, 'io, Message> where Message: Send + 's
e @ Err(_) => e, e @ Err(_) => e,
} }
} }
/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
match self.connections.get(Token(peer)) {
Some(&ConnectionEntry::Session(ref s)) => {
s.info.client_version.clone()
},
_ => {
"unknown".to_string()
}
}
}
} }
/// Shared host information /// Shared host information
@ -255,6 +269,7 @@ impl<Message> Host<Message> where Message: Send {
fn maintain_network(&mut self, io: &mut IoContext<NetworkIoMessage<Message>>) { fn maintain_network(&mut self, io: &mut IoContext<NetworkIoMessage<Message>>) {
self.connect_peers(io); self.connect_peers(io);
io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap();
} }
fn have_session(&self, id: &NodeId) -> bool { fn have_session(&self, id: &NodeId) -> bool {
@ -340,6 +355,7 @@ impl<Message> Host<Message> where Message: Send {
let nonce = self.info.next_nonce(); let nonce = self.info.next_nonce();
match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) { match self.connections.insert_with(|token| ConnectionEntry::Handshake(Handshake::new(token, id, socket, &nonce).expect("Can't create handshake"))) {
Some(token) => { Some(token) => {
warn!(target: "slab", "inserted {}", token.as_usize());
match self.connections.get_mut(token) { match self.connections.get_mut(token) {
Some(&mut ConnectionEntry::Handshake(ref mut h)) => { Some(&mut ConnectionEntry::Handshake(ref mut h)) => {
h.start(&self.info, true) h.start(&self.info, true)
@ -469,6 +485,21 @@ impl<Message> Host<Message> where Message: Send {
fn start_session(&mut self, token: StreamToken, io: &mut IoContext<NetworkIoMessage<Message>>) { fn start_session(&mut self, token: StreamToken, io: &mut IoContext<NetworkIoMessage<Message>>) {
let info = &self.info; let info = &self.info;
// TODO: use slab::replace_with (currently broken)
match self.connections.remove(Token(token)) {
Some(ConnectionEntry::Handshake(h)) => {
match Session::new(h, io.event_loop, info) {
Ok(session) => {
assert!(Token(token) == self.connections.insert(ConnectionEntry::Session(session)).ok().unwrap());
},
Err(e) => {
debug!(target: "net", "Session construction error: {:?}", e);
}
}
},
_ => panic!("Error updating slab with session")
}
/*
self.connections.replace_with(Token(token), |c| { self.connections.replace_with(Token(token), |c| {
match c { match c {
ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info) ConnectionEntry::Handshake(h) => Session::new(h, io.event_loop, info)
@ -480,6 +511,7 @@ impl<Message> Host<Message> where Message: Send {
_ => { panic!("No handshake to create a session from"); } _ => { panic!("No handshake to create a session from"); }
} }
}).expect("Error updating slab with session"); }).expect("Error updating slab with session");
*/
} }
fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) { fn connection_timeout<'s>(&'s mut self, token: StreamToken, io: &mut IoContext<'s, NetworkIoMessage<Message>>) {
@ -504,6 +536,7 @@ impl<Message> Host<Message> where Message: Send {
h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token); h.disconnected(&mut NetworkContext::new(io, p, Some(token), &mut self.connections, &mut self.timers), &token);
} }
self.connections.remove(Token(token)); self.connections.remove(Token(token));
warn!(target: "slab", "removed {}", token);
} }
} }
@ -518,7 +551,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
// Start listening for incoming connections // Start listening for incoming connections
io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap(); io.event_loop.register(&self.listener, Token(TCP_ACCEPT), EventSet::readable(), PollOpt::edge()).unwrap();
io.event_loop.timeout_ms(Token(IDLE), 1000).unwrap(); //TODO: check delay io.event_loop.timeout_ms(Token(IDLE), MAINTENANCE_TIMEOUT).unwrap();
// open the udp socket // open the udp socket
io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap(); io.event_loop.register(&self.udp_socket, Token(NODETABLE_RECEIVE), EventSet::readable(), PollOpt::edge()).unwrap();
io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap(); io.event_loop.timeout_ms(Token(NODETABLE_MAINTAIN), 7200).unwrap();