use Io queue for messages

This commit is contained in:
keorn 2016-11-28 15:42:36 +00:00
parent 1326c6cf5a
commit b454f7e307
2 changed files with 16 additions and 3 deletions

View File

@ -562,6 +562,13 @@ impl Client {
results.len() results.len()
} }
/// Handle messages from the IO queue
pub fn handle_queued_message(&self, message: &Bytes) {
if let Err(e) = self.engine.handle_message(UntrustedRlp::new(message)) {
trace!(target: "poa", "Invalid message received: {}", e);
}
}
/// Used by PoA to try sealing on period change. /// Used by PoA to try sealing on period change.
pub fn update_sealing(&self) { pub fn update_sealing(&self) {
self.miner.update_sealing(self) self.miner.update_sealing(self)
@ -1229,9 +1236,10 @@ impl BlockChainClient for Client {
self.miner.pending_transactions(self.chain.read().best_block_number()) self.miner.pending_transactions(self.chain.read().best_block_number())
} }
// TODO: Make it an actual queue, return errors.
fn queue_infinity_message(&self, message: Bytes) { fn queue_infinity_message(&self, message: Bytes) {
self.engine.handle_message(UntrustedRlp::new(&message)); if let Err(e) = self.io_channel.lock().send(ClientIoMessage::NewMessage(message)) {
debug!("Ignoring the message, error queueing: {}", e);
}
} }
fn signing_network_id(&self) -> Option<u8> { fn signing_network_id(&self) -> Option<u8> {

View File

@ -53,7 +53,9 @@ pub enum ClientIoMessage {
/// Submit seal (useful for internal sealing). /// Submit seal (useful for internal sealing).
SubmitSeal(H256, Vec<Bytes>), SubmitSeal(H256, Vec<Bytes>),
/// Broadcast a message to the network. /// Broadcast a message to the network.
BroadcastMessage(Bytes) BroadcastMessage(Bytes),
/// New consensus message received.
NewMessage(Bytes)
} }
/// Client service setup. Creates and registers client and network services with the IO subsystem. /// Client service setup. Creates and registers client and network services with the IO subsystem.
@ -234,6 +236,9 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
trace!(target: "poa", "message: BroadcastMessage"); trace!(target: "poa", "message: BroadcastMessage");
self.client.broadcast_message(message.clone()); self.client.broadcast_message(message.clone());
}, },
ClientIoMessage::NewMessage(ref message) => {
self.client.handle_queued_message(message);
},
_ => {} // ignore other messages _ => {} // ignore other messages
} }
} }