reseal on timeout

This commit is contained in:
keorn 2016-09-27 12:12:18 +02:00
parent c57e3cefe4
commit ec058cdb50
4 changed files with 43 additions and 13 deletions

View File

@ -488,6 +488,11 @@ impl Client {
self.miner.set_author(author) self.miner.set_author(author)
} }
/// Used by PoA to try sealing on period change.
pub fn update_sealing(&self) {
self.miner.update_sealing(self)
}
/// Attempt to get a copy of a specific block's final state. /// Attempt to get a copy of a specific block's final state.
/// ///
/// This will not fail if given BlockID::Latest. /// This will not fail if given BlockID::Latest.
@ -1025,13 +1030,15 @@ impl BlockChainClient for Client {
} }
fn queue_transactions(&self, transactions: Vec<Bytes>) { fn queue_transactions(&self, transactions: Vec<Bytes>) {
if self.queue_transactions.load(AtomicOrdering::Relaxed) > MAX_TX_QUEUE_SIZE { let queue_size = self.queue_transactions.load(AtomicOrdering::Relaxed);
trace!(target: "external_tx", "Queue size: {}", queue_size);
if queue_size > MAX_TX_QUEUE_SIZE {
debug!("Ignoring {} transactions: queue is full", transactions.len()); debug!("Ignoring {} transactions: queue is full", transactions.len());
} else { } else {
let len = transactions.len(); let len = transactions.len();
match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) { match self.io_channel.send(ClientIoMessage::NewTransactions(transactions)) {
Ok(_) => { Ok(_) => {
trace!(target: "external_tx", "Sending IoMessage"); trace!(target: "external_tx", "Sent IoMessage");
self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst); self.queue_transactions.fetch_add(len, AtomicOrdering::SeqCst);
} }
Err(e) => { Err(e) => {

View File

@ -27,7 +27,8 @@ use spec::CommonParams;
use engines::Engine; use engines::Engine;
use evm::Schedule; use evm::Schedule;
use ethjson; use ethjson;
use io::{IoContext, IoHandler, TimerToken, IoService}; use io::{IoContext, IoHandler, TimerToken, IoService, IoChannel};
use service::ClientIoMessage;
use time::get_time; use time::get_time;
/// `AuthorityRound` params. /// `AuthorityRound` params.
@ -61,6 +62,7 @@ pub struct AuthorityRound {
our_params: AuthorityRoundParams, our_params: AuthorityRoundParams,
builtins: BTreeMap<Address, Builtin>, builtins: BTreeMap<Address, Builtin>,
transistion_service: IoService<BlockArrived>, transistion_service: IoService<BlockArrived>,
message_channel: Mutex<Option<IoChannel<ClientIoMessage>>>,
step: AtomicUsize, step: AtomicUsize,
} }
@ -73,6 +75,7 @@ impl AuthorityRound {
our_params: our_params, our_params: our_params,
builtins: builtins, builtins: builtins,
transistion_service: IoService::<BlockArrived>::start().expect("Error creating engine timeout service"), transistion_service: IoService::<BlockArrived>::start().expect("Error creating engine timeout service"),
message_channel: Mutex::new(None),
step: AtomicUsize::new(0), step: AtomicUsize::new(0),
}); });
let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; let handler = TransitionHandler { engine: Arc::downgrade(&engine) };
@ -115,19 +118,22 @@ impl IoHandler<BlockArrived> for TransitionHandler {
if let Some(engine) = self.engine.upgrade() { if let Some(engine) = self.engine.upgrade() {
debug!(target: "authorityround", "Timeout step: {}", engine.step.load(AtomicOrdering::Relaxed)); debug!(target: "authorityround", "Timeout step: {}", engine.step.load(AtomicOrdering::Relaxed));
engine.step.fetch_add(1, AtomicOrdering::SeqCst); engine.step.fetch_add(1, AtomicOrdering::SeqCst);
if let Some(ref channel) = *engine.message_channel.try_lock().unwrap() {
channel.send(ClientIoMessage::UpdateSealing);
}
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.") io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.")
} }
} }
} }
fn message(&self, io: &IoContext<BlockArrived>, _net_message: &BlockArrived) { // fn message(&self, io: &IoContext<BlockArrived>, _net_message: &BlockArrived) {
if let Some(engine) = self.engine.upgrade() { // if let Some(engine) = self.engine.upgrade() {
trace!(target: "authorityround", "Message: {:?}", get_time().sec); // trace!(target: "authorityround", "Message: {:?}", get_time().sec);
engine.step.fetch_add(1, AtomicOrdering::SeqCst); // engine.step.fetch_add(1, AtomicOrdering::SeqCst);
io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer."); // io.clear_timer(ENGINE_TIMEOUT_TOKEN).expect("Failed to restart consensus step timer.");
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.") // io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.our_params.step_duration).expect("Failed to restart consensus step timer.")
} // }
} // }
} }
impl Engine for AuthorityRound { impl Engine for AuthorityRound {
@ -248,6 +254,11 @@ impl Engine for AuthorityRound {
fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> { fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> {
t.sender().map(|_|()) // Perform EC recovery and cache sender t.sender().map(|_|()) // Perform EC recovery and cache sender
} }
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
let mut guard = self.message_channel.try_lock().unwrap();
*guard = Some(message_channel);
}
} }
impl Header { impl Header {

View File

@ -31,6 +31,8 @@ use account_provider::AccountProvider;
use block::ExecutedBlock; use block::ExecutedBlock;
use spec::CommonParams; use spec::CommonParams;
use evm::Schedule; use evm::Schedule;
use io::IoChannel;
use service::ClientIoMessage;
/// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based. /// A consensus mechanism for the chain. Generally either proof-of-work or proof-of-stake-based.
/// Provides hooks into each of the major parts of block import. /// Provides hooks into each of the major parts of block import.
@ -130,5 +132,7 @@ pub trait Engine : Sync + Send {
/// Panics if `is_builtin(a)` is not true. /// Panics if `is_builtin(a)` is not true.
fn execute_builtin(&self, a: &Address, input: &[u8], output: &mut [u8]) { self.builtins().get(a).unwrap().execute(input, output); } fn execute_builtin(&self, a: &Address, input: &[u8], output: &mut [u8]) { self.builtins().get(a).unwrap().execute(input, output); }
/// Add a channel for communication with Client.
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {}
// TODO: sealing stuff - though might want to leave this for later. // TODO: sealing stuff - though might want to leave this for later.
} }

View File

@ -21,7 +21,7 @@ use io::*;
use spec::Spec; use spec::Spec;
use error::*; use error::*;
use client::{Client, ClientConfig, ChainNotify}; use client::{Client, ClientConfig, ChainNotify};
use miner::Miner; use miner::{Miner, MinerService};
use snapshot::ManifestData; use snapshot::ManifestData;
use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams}; use snapshot::service::{Service as SnapshotService, ServiceParams as SnapServiceParams};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -48,6 +48,8 @@ pub enum ClientIoMessage {
FeedBlockChunk(H256, Bytes), FeedBlockChunk(H256, Bytes),
/// Take a snapshot for the block with given number. /// Take a snapshot for the block with given number.
TakeSnapshot(u64), TakeSnapshot(u64),
/// Trigger sealing update (useful for internal sealing).
UpdateSealing,
} }
/// Client service setup. Creates and registers client and network services with the IO subsystem. /// Client service setup. Creates and registers client and network services with the IO subsystem.
@ -105,6 +107,8 @@ impl ClientService {
}); });
try!(io_service.register_handler(client_io)); try!(io_service.register_handler(client_io));
spec.engine.register_message_channel(io_service.channel());
let stop_guard = ::devtools::StopGuard::new(); let stop_guard = ::devtools::StopGuard::new();
run_ipc(ipc_path, client.clone(), snapshot.clone(), stop_guard.share()); run_ipc(ipc_path, client.clone(), snapshot.clone(), stop_guard.share());
@ -196,7 +200,11 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) { if let Err(e) = self.snapshot.take_snapshot(&*self.client, num) {
warn!("Failed to take snapshot at block #{}: {}", num, e); warn!("Failed to take snapshot at block #{}: {}", num, e);
} }
} },
ClientIoMessage::UpdateSealing => {
println!("Message received!");
self.client.update_sealing()
},
_ => {} // ignore other messages _ => {} // ignore other messages
} }
} }