Reduced IO messages; removed panics on IO notifications (#1457)
This commit is contained in:
parent
1fdbfa14ad
commit
60b70dada1
@ -359,7 +359,7 @@ impl<V> Client<V> where V: Verifier {
|
|||||||
invalid: invalid_blocks,
|
invalid: invalid_blocks,
|
||||||
enacted: enacted,
|
enacted: enacted,
|
||||||
retracted: retracted,
|
retracted: retracted,
|
||||||
})).unwrap();
|
})).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,11 +160,13 @@ impl SyncProvider for EthSync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn start_network(&self) {
|
fn start_network(&self) {
|
||||||
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification");
|
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork))
|
||||||
|
.unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop_network(&self) {
|
fn stop_network(&self) {
|
||||||
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification");
|
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork))
|
||||||
|
.unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,8 +135,9 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Broadcast a message to other IO clients
|
/// Broadcast a message to other IO clients
|
||||||
pub fn message(&self, message: Message) {
|
pub fn message(&self, message: Message) -> Result<(), UtilError> {
|
||||||
self.channel.send(message).expect("Error seding message");
|
try!(self.channel.send(message));
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get message channel
|
/// Get message channel
|
||||||
@ -351,7 +352,9 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
|||||||
/// Starts IO event loop
|
/// Starts IO event loop
|
||||||
pub fn start() -> Result<IoService<Message>, UtilError> {
|
pub fn start() -> Result<IoService<Message>, UtilError> {
|
||||||
let panic_handler = PanicHandler::new_in_arc();
|
let panic_handler = PanicHandler::new_in_arc();
|
||||||
let mut event_loop = EventLoop::new().unwrap();
|
let mut config = EventLoopConfig::new();
|
||||||
|
config.messages_per_tick(1024);
|
||||||
|
let mut event_loop = EventLoop::configured(config).expect("Error creating event loop");
|
||||||
let channel = event_loop.channel();
|
let channel = event_loop.channel();
|
||||||
let panic = panic_handler.clone();
|
let panic = panic_handler.clone();
|
||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
@ -390,7 +393,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
|
|||||||
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
|
impl<Message> Drop for IoService<Message> where Message: Send + Sync + Clone {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!(target: "shutdown", "[IoService] Closing...");
|
trace!(target: "shutdown", "[IoService] Closing...");
|
||||||
self.host_channel.send(IoMessage::Shutdown).unwrap();
|
self.host_channel.send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
|
||||||
self.thread.take().unwrap().join().ok();
|
self.thread.take().unwrap().join().ok();
|
||||||
trace!(target: "shutdown", "[IoService] Closed.");
|
trace!(target: "shutdown", "[IoService] Closed.");
|
||||||
}
|
}
|
||||||
|
@ -28,7 +28,7 @@ use crypto::*;
|
|||||||
use rlp::*;
|
use rlp::*;
|
||||||
use network::node_table::*;
|
use network::node_table::*;
|
||||||
use network::error::NetworkError;
|
use network::error::NetworkError;
|
||||||
use io::StreamToken;
|
use io::{StreamToken, IoContext};
|
||||||
|
|
||||||
use network::PROTOCOL_VERSION;
|
use network::PROTOCOL_VERSION;
|
||||||
|
|
||||||
@ -283,7 +283,7 @@ impl Discovery {
|
|||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn writable(&mut self) {
|
pub fn writable<Message>(&mut self, io: &IoContext<Message>) where Message: Send + Sync + Clone {
|
||||||
while !self.send_queue.is_empty() {
|
while !self.send_queue.is_empty() {
|
||||||
let data = self.send_queue.pop_front().unwrap();
|
let data = self.send_queue.pop_front().unwrap();
|
||||||
match self.udp_socket.send_to(&data.payload, &data.address) {
|
match self.udp_socket.send_to(&data.payload, &data.address) {
|
||||||
@ -302,15 +302,17 @@ impl Discovery {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
|
fn send_to(&mut self, payload: Bytes, address: SocketAddr) {
|
||||||
self.send_queue.push_back(Datagramm { payload: payload, address: address });
|
self.send_queue.push_back(Datagramm { payload: payload, address: address });
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn readable(&mut self) -> Option<TableUpdates> {
|
pub fn readable<Message>(&mut self, io: &IoContext<Message>) -> Option<TableUpdates> where Message: Send + Sync + Clone {
|
||||||
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
|
let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() };
|
||||||
match self.udp_socket.recv_from(&mut buf) {
|
let writable = !self.send_queue.is_empty();
|
||||||
|
let res = match self.udp_socket.recv_from(&mut buf) {
|
||||||
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
|
Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| {
|
||||||
debug!("Error processing UDP packet: {:?}", e);
|
debug!("Error processing UDP packet: {:?}", e);
|
||||||
None
|
None
|
||||||
@ -320,7 +322,12 @@ impl Discovery {
|
|||||||
debug!("Error reading UPD socket: {:?}", e);
|
debug!("Error reading UPD socket: {:?}", e);
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
let new_writable = !self.send_queue.is_empty();
|
||||||
|
if writable != new_writable {
|
||||||
|
io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
||||||
}
|
}
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
|
fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result<Option<TableUpdates>, NetworkError> {
|
||||||
|
@ -236,8 +236,8 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Send an IO message
|
/// Send an IO message
|
||||||
pub fn message(&self, msg: Message) {
|
pub fn message(&self, msg: Message) -> Result<(), UtilError> {
|
||||||
self.io.message(NetworkIoMessage::User(msg));
|
self.io.message(NetworkIoMessage::User(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an IoChannel.
|
/// Get an IoChannel.
|
||||||
@ -248,12 +248,14 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
|
||||||
pub fn disable_peer(&self, peer: PeerId) {
|
pub fn disable_peer(&self, peer: PeerId) {
|
||||||
//TODO: remove capability, disconnect if no capabilities left
|
//TODO: remove capability, disconnect if no capabilities left
|
||||||
self.io.message(NetworkIoMessage::DisablePeer(peer));
|
self.io.message(NetworkIoMessage::DisablePeer(peer))
|
||||||
|
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Disconnect peer. Reconnect can be attempted later.
|
/// Disconnect peer. Reconnect can be attempted later.
|
||||||
pub fn disconnect_peer(&self, peer: PeerId) {
|
pub fn disconnect_peer(&self, peer: PeerId) {
|
||||||
self.io.message(NetworkIoMessage::Disconnect(peer));
|
self.io.message(NetworkIoMessage::Disconnect(peer))
|
||||||
|
.unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if the session is still active.
|
/// Check if the session is still active.
|
||||||
@ -267,7 +269,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
|
|||||||
token: token,
|
token: token,
|
||||||
delay: ms,
|
delay: ms,
|
||||||
protocol: self.protocol,
|
protocol: self.protocol,
|
||||||
});
|
}).unwrap_or_else(|e| warn!("Error sending network IO message: {:?}", e));
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -714,7 +716,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
|
|||||||
debug!(target: "network", "Can't accept connection: {:?}", e);
|
debug!(target: "network", "Can't accept connection: {:?}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
|
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
|
||||||
@ -910,11 +911,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() };
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable(io) };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
|
||||||
},
|
},
|
||||||
TCP_ACCEPT => self.accept(io),
|
TCP_ACCEPT => self.accept(io),
|
||||||
_ => panic!("Received unknown readable token"),
|
_ => panic!("Received unknown readable token"),
|
||||||
@ -928,8 +928,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
match stream {
|
match stream {
|
||||||
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
|
||||||
DISCOVERY => {
|
DISCOVERY => {
|
||||||
self.discovery.lock().unwrap().as_mut().unwrap().writable();
|
self.discovery.lock().unwrap().as_mut().unwrap().writable(io);
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
|
||||||
}
|
}
|
||||||
_ => panic!("Received unknown writable token"),
|
_ => panic!("Received unknown writable token"),
|
||||||
}
|
}
|
||||||
@ -946,14 +945,14 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
|
||||||
DISCOVERY_REFRESH => {
|
DISCOVERY_REFRESH => {
|
||||||
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
|
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
||||||
},
|
},
|
||||||
DISCOVERY_ROUND => {
|
DISCOVERY_ROUND => {
|
||||||
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
|
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().round() };
|
||||||
if let Some(node_changes) = node_changes {
|
if let Some(node_changes) = node_changes {
|
||||||
self.update_nodes(io, node_changes);
|
self.update_nodes(io, node_changes);
|
||||||
}
|
}
|
||||||
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
|
io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e));
|
||||||
},
|
},
|
||||||
NODE_TABLE => {
|
NODE_TABLE => {
|
||||||
trace!(target: "network", "Refreshing node table");
|
trace!(target: "network", "Refreshing node table");
|
||||||
@ -1004,7 +1003,7 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
|
|||||||
handler_token
|
handler_token
|
||||||
};
|
};
|
||||||
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
|
self.timers.write().unwrap().insert(handler_token, ProtocolTimer { protocol: protocol, token: *token });
|
||||||
io.register_timer(handler_token, *delay).expect("Error registering timer");
|
io.register_timer(handler_token, *delay).unwrap_or_else(|e| debug!("Error registering timer {}: {:?}", token, e));
|
||||||
},
|
},
|
||||||
NetworkIoMessage::Disconnect(ref peer) => {
|
NetworkIoMessage::Disconnect(ref peer) => {
|
||||||
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
|
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
|
||||||
|
Loading…
Reference in New Issue
Block a user