Stream deregistration
This commit is contained in:
parent
b4bfdf2fd3
commit
acbb50d700
@ -74,6 +74,8 @@ pub trait IoHandler<Message>: Send + Sync where Message: Send + Sync + Clone + '
|
|||||||
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
fn register_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
/// Re-register a stream with the event loop
|
/// Re-register a stream with the event loop
|
||||||
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
fn update_stream(&self, _stream: StreamToken, _reg: Token, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
|
/// Deregister a stream. Called whenstream is removed from event loop
|
||||||
|
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &mut EventLoop<IoManager<Message>>) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO [arkpar] Please document me
|
/// TODO [arkpar] Please document me
|
||||||
|
@ -42,6 +42,10 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
|
|||||||
handler_id: HandlerId,
|
handler_id: HandlerId,
|
||||||
token: StreamToken,
|
token: StreamToken,
|
||||||
},
|
},
|
||||||
|
DeregisterStream {
|
||||||
|
handler_id: HandlerId,
|
||||||
|
token: StreamToken,
|
||||||
|
},
|
||||||
UpdateStreamRegistration {
|
UpdateStreamRegistration {
|
||||||
handler_id: HandlerId,
|
handler_id: HandlerId,
|
||||||
token: StreamToken,
|
token: StreamToken,
|
||||||
@ -83,6 +87,7 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
}));
|
}));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new IO stream.
|
/// Register a new IO stream.
|
||||||
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
pub fn register_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
try!(self.channel.send_io(IoMessage::RegisterStream {
|
try!(self.channel.send_io(IoMessage::RegisterStream {
|
||||||
@ -92,6 +97,15 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deregister an IO stream.
|
||||||
|
pub fn deregister_stream(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
|
try!(self.channel.send_io(IoMessage::DeregisterStream {
|
||||||
|
token: token,
|
||||||
|
handler_id: self.handler,
|
||||||
|
}));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Reregister an IO stream.
|
/// Reregister an IO stream.
|
||||||
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
|
pub fn update_registration(&self, token: StreamToken) -> Result<(), UtilError> {
|
||||||
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
|
try!(self.channel.send_io(IoMessage::UpdateStreamRegistration {
|
||||||
@ -214,6 +228,10 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
|
|||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
},
|
},
|
||||||
|
IoMessage::DeregisterStream { handler_id, token } => {
|
||||||
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
|
handler.deregister_stream(token, event_loop);
|
||||||
|
},
|
||||||
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
IoMessage::UpdateStreamRegistration { handler_id, token } => {
|
||||||
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
|
||||||
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
|
||||||
|
@ -150,6 +150,13 @@ impl Connection {
|
|||||||
Err(e)
|
Err(e)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete connection registration. Should be called at the end of the IO handler.
|
||||||
|
pub fn deregister_socket<Host: Handler>(&self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
|
||||||
|
trace!(target: "net", "connection deregister; token={:?}", self.token);
|
||||||
|
event_loop.deregister(&self.socket).ok(); // ignore errors here
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// RLPx packet
|
/// RLPx packet
|
||||||
@ -371,6 +378,12 @@ impl EncryptedConnection {
|
|||||||
try!(self.connection.update_socket(reg, event_loop));
|
try!(self.connection.update_socket(reg, event_loop));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete connection registration. This should be called at the end of the event loop.
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.deregister_socket(event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -134,6 +134,12 @@ impl Handshake {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete registration
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
try!(self.connection.deregister_socket(event_loop));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse, validate and confirm auth message
|
/// Parse, validate and confirm auth message
|
||||||
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
fn read_auth(&mut self, host: &HostInfo, data: &[u8]) -> Result<(), UtilError> {
|
||||||
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
|
||||||
|
@ -285,7 +285,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR
|
host.add_node("enode://de471bccee3d042261d52e9bff31458daecc406142b401d4cd848f677479f73104b9fdeb090af9583d3391b7f10cb2ba9e26865dd5fca4fcdc0fb1e3b723c786@54.94.239.50:30303"); // BR
|
||||||
host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG
|
host.add_node("enode://1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082@52.74.57.123:30303"); // SG
|
||||||
// ETH/DEV cpp-ethereum (poc-9.ethdev.com)
|
// ETH/DEV cpp-ethereum (poc-9.ethdev.com)
|
||||||
host.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303");
|
//host.add_node("enode://979b7fa28feeb35a4741660a16076f1943202cb72b6af70d327f053e248bab9ba81760f39d0701ef1d8f89cc1fbd2cacba0710a12cd5314d5e0c9021aa3637f9@5.1.83.226:30303");
|
||||||
host
|
host
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -395,7 +395,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let nonce = self.info.write().unwrap().next_nonce();
|
let nonce = self.info.write().unwrap().next_nonce();
|
||||||
if self.connections.write().unwrap().insert_with(|token| {
|
let mut connections = self.connections.write().unwrap();
|
||||||
|
if connections.insert_with(|token| {
|
||||||
let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake");
|
let mut handshake = Handshake::new(token, id, socket, &nonce).expect("Can't create handshake");
|
||||||
handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
|
handshake.start(io, &self.info.read().unwrap(), true).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
|
||||||
debug!(target: "net", "Handshake create error: {:?}", e);
|
debug!(target: "net", "Handshake create error: {:?}", e);
|
||||||
@ -552,6 +553,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
io.deregister_stream(token).expect("Error deregistering stream");
|
||||||
}
|
}
|
||||||
for p in to_disconnect {
|
for p in to_disconnect {
|
||||||
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
|
||||||
@ -664,6 +666,24 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn deregister_stream(&self, stream: StreamToken, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
||||||
|
match stream {
|
||||||
|
FIRST_CONNECTION ... LAST_CONNECTION => {
|
||||||
|
let mut connections = self.connections.write().unwrap();
|
||||||
|
if let Some(connection) = connections.get(stream).cloned() {
|
||||||
|
match *connection.lock().unwrap().deref() {
|
||||||
|
ConnectionEntry::Handshake(ref h) => h.deregister_socket(event_loop).expect("Error deregistering socket"),
|
||||||
|
ConnectionEntry::Session(ref s) => s.deregister_socket(event_loop).expect("Error deregistering session socket"),
|
||||||
|
}
|
||||||
|
connections.remove(stream);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
NODETABLE_RECEIVE => event_loop.deregister(self.udp_socket.lock().unwrap().deref()).unwrap(),
|
||||||
|
TCP_ACCEPT => event_loop.deregister(self.tcp_listener.lock().unwrap().deref()).unwrap(),
|
||||||
|
_ => warn!("Unexpected stream deregistration")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
fn update_stream(&self, stream: StreamToken, reg: Token, event_loop: &mut EventLoop<IoManager<NetworkIoMessage<Message>>>) {
|
||||||
match stream {
|
match stream {
|
||||||
FIRST_CONNECTION ... LAST_CONNECTION => {
|
FIRST_CONNECTION ... LAST_CONNECTION => {
|
||||||
|
@ -131,6 +131,11 @@ impl Session {
|
|||||||
self.connection.update_socket(reg, event_loop)
|
self.connection.update_socket(reg, event_loop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete registration
|
||||||
|
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
|
||||||
|
self.connection.deregister_socket(event_loop)
|
||||||
|
}
|
||||||
|
|
||||||
/// Send a protocol packet to peer.
|
/// Send a protocol packet to peer.
|
||||||
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
|
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
|
||||||
let mut i = 0usize;
|
let mut i = 0usize;
|
||||||
|
Loading…
Reference in New Issue
Block a user