diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index e89bc8921..b6976a933 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -562,6 +562,13 @@ impl Client { results.len() } + /// Handle messages from the IO queue + pub fn handle_queued_message(&self, message: &Bytes) { + if let Err(e) = self.engine.handle_message(UntrustedRlp::new(message)) { + trace!(target: "poa", "Invalid message received: {}", e); + } + } + /// Used by PoA to try sealing on period change. pub fn update_sealing(&self) { self.miner.update_sealing(self) @@ -1229,9 +1236,10 @@ impl BlockChainClient for Client { self.miner.pending_transactions(self.chain.read().best_block_number()) } - // TODO: Make it an actual queue, return errors. fn queue_infinity_message(&self, message: Bytes) { - self.engine.handle_message(UntrustedRlp::new(&message)); + if let Err(e) = self.io_channel.lock().send(ClientIoMessage::NewMessage(message)) { + debug!("Ignoring the message, error queueing: {}", e); + } } fn signing_network_id(&self) -> Option { diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index b3e1e0d51..c7dccaa89 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -53,7 +53,9 @@ pub enum ClientIoMessage { /// Submit seal (useful for internal sealing). SubmitSeal(H256, Vec), /// Broadcast a message to the network. - BroadcastMessage(Bytes) + BroadcastMessage(Bytes), + /// New consensus message received. + NewMessage(Bytes) } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -234,6 +236,9 @@ impl IoHandler for ClientIoHandler { trace!(target: "poa", "message: BroadcastMessage"); self.client.broadcast_message(message.clone()); }, + ClientIoMessage::NewMessage(ref message) => { + self.client.handle_queued_message(message); + }, _ => {} // ignore other messages } }