From ec058cdb50875ef28c3799bf7052ec05e3b617ae Mon Sep 17 00:00:00 2001 From: keorn Date: Tue, 27 Sep 2016 12:12:18 +0200 Subject: [PATCH] reseal on timeout --- ethcore/src/client/client.rs | 11 ++++++++-- ethcore/src/engines/authority_round.rs | 29 ++++++++++++++++++-------- ethcore/src/engines/mod.rs | 4 ++++ ethcore/src/service.rs | 12 +++++++++-- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index ad2c3fc8b..e90f92be6 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -488,6 +488,11 @@ impl Client { self.miner.set_author(author) } + /// Used by PoA to try sealing on period change. + pub fn update_sealing(&self) { + self.miner.update_sealing(self) + } + /// Attempt to get a copy of a specific block's final state. /// /// This will not fail if given BlockID::Latest. @@ -1025,13 +1030,15 @@ impl BlockChainClient for Client { } fn queue_transactions(&self, transactions: Vec) { - if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { + let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed); + trace!(target: "external_tx", "Queue size: {}", queue_size); + if queue_size > MAX_TX_QUEUE_SIZE { debug!("Ignoring {} transactions: queue is full", transactions.len()); } else { let len = transactions.len(); match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) { Ok(_) => { - trace!(target: "external_tx", "Sending IoMessage"); + trace!(target: "external_tx", "Sent IoMessage"); self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); } Err(e) => { diff --git a/ethcore/src/engines/authority_round.rs b/ethcore/src/engines/authority_round.rs index c2fa89e90..951f7f06f 100644 --- a/ethcore/src/engines/authority_round.rs +++ b/ethcore/src/engines/authority_round.rs @@ -27,7 +27,8 @@ use spec::CommonParams; use engines::Engine; use evm::Schedule; use ethjson; -use io::{IoContext, IoHandler, TimerToken, IoService}; +use io::{IoContext, IoHandler, TimerToken, IoService, IoChannel}; +use service::ClientIoMessage; use time::get_time; /// `AuthorityRound` params. @@ -61,6 +62,7 @@ pub struct AuthorityRound { our_params: AuthorityRoundParams, builtins: BTreeMap, transistion_service: IoService, + message_channel: Mutex>>, step: AtomicUsize, } @@ -73,6 +75,7 @@ impl AuthorityRound { our_params: our_params, builtins: builtins, transistion_service: IoService::::start().expect("Error creating engine timeout service"), + message_channel: Mutex::new(None), step: AtomicUsize::new(0), }); let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; @@ -115,19 +118,22 @@ impl IoHandler for TransitionHandler { if let Some(engine) = self.engine.upgrade() { debug!(target: "authorityround", "Timeout step: {}", engine.step.load(AtomicOrdering::Relaxed)); engine.step.fetch_add(1, AtomicOrdering::SeqCst); + if let Some(ref channel) = *engine.message_channel.try_lock().unwrap() { + channel.send(ClientIoMessage::UpdateSealing); + } io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.") } } } - fn message(&self, io: &IoContext, _net_message: &BlockArrived) { - if let Some(engine) = self.engine.upgrade() { - trace!(target: "authorityround", "Message: {:?}", get_time().sec); - engine.step.fetch_add(1, AtomicOrdering::SeqCst); - io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.") - } - } +// fn message(&self, io: &IoContext, _net_message: &BlockArrived) { +// if let Some(engine) = self.engine.upgrade() { +// trace!(target: "authorityround", "Message: {:?}", get_time().sec); +// engine.step.fetch_add(1, AtomicOrdering::SeqCst); +// io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); +// io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.") +// } +// } } impl Engine for AuthorityRound { @@ -248,6 +254,11 @@ impl Engine for AuthorityRound { fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> { t.sender().map(|_|()) // Perform EC recovery and cache sender } + + fn register_message_channel(&self, message_channel: IoChannel) { + let mut guard = self.message_channel.try_lock().unwrap(); + *guard = Some(message_channel); + } } impl Header { diff --git a/ethcore/src/engines/mod.rs b/ethcore/src/engines/mod.rs index ec27a2fe2..518eab943 100644 --- a/ethcore/src/engines/mod.rs +++ b/ethcore/src/engines/mod.rs @@ -31,6 +31,8 @@ use account_provider::AccountProvider; use block::ExecutedBlock; use spec::CommonParams; use evm::Schedule; +use io::IoChannel; +use service::ClientIoMessage; /// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based. /// Provides hooks into each of the major parts of block import. @@ -130,5 +132,7 @@ pub trait Engine : Sync + Send { /// Panics if `is_builtin(a)` is not true. fn execute_builtin(&self, a: &Address, input: &[u8], output: &mut [u8]) { self.builtins().get(a).unwrap().execute(input, output); } + /// Add a channel for communication with Client. + fn register_message_channel(&self, message_channel: IoChannel) {} // TODO: sealing stuff - though might want to leave this for later. } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 9fa126cc7..8b1048393 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -21,7 +21,7 @@ use io::*; use spec::Spec; use error::*; use client::{Client, ClientConfig, ChainNotify}; -use miner::Miner; +use miner::{Miner, MinerService}; use snapshot::ManifestData; use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; use std::sync::atomic::AtomicBool; @@ -48,6 +48,8 @@ pub enum ClientIoMessage { FeedBlockChunk(H256, Bytes), /// Take a snapshot for the block with given number. TakeSnapshot(u64), + /// Trigger sealing update (useful for internal sealing). + UpdateSealing, } /// Client service setup. Creates and registers client and network services with the IO subsystem. @@ -105,6 +107,8 @@ impl ClientService { }); try!(io_service.register_handler(client_io)); + spec.engine.register_message_channel(io_service.channel()); + let stop_guard = ::devtools::StopGuard::new(); run_ipc(ipc_path, client.clone(), snapshot.clone(), stop_guard.share()); @@ -196,7 +200,11 @@ impl IoHandler for ClientIoHandler { if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) { warn!("Failed to take snapshot at block #{}: {}", num, e); } - } + }, + ClientIoMessage::UpdateSealing => { + println!("Message received!"); + self.client.update_sealing() + }, _ => {} // ignore other messages } }