Merge branch 'engine-password' into auth-bft
This commit is contained in:
@@ -200,6 +200,7 @@ pub struct BlockChain {
|
||||
pending_block_details: RwLock<HashMap<H256, BlockDetails>>,
|
||||
pending_transaction_addresses: RwLock<HashMap<H256, Option<TransactionAddress>>>,
|
||||
|
||||
// Used for block ordering.
|
||||
engine: Arc<Engine>,
|
||||
}
|
||||
|
||||
|
||||
@@ -209,10 +209,6 @@ impl Engine for AuthorityRound {
|
||||
});
|
||||
}
|
||||
|
||||
/// Apply the block reward on finalisation of the block.
|
||||
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
|
||||
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
|
||||
|
||||
fn is_sealer(&self, author: &Address) -> Option<bool> {
|
||||
let p = &self.our_params;
|
||||
Some(p.authorities.contains(author))
|
||||
|
||||
@@ -100,13 +100,8 @@ impl Engine for BasicAuthority {
|
||||
max(gas_floor_target, gas_limit - gas_limit / bound_divisor + 1.into())
|
||||
}
|
||||
});
|
||||
// info!("ethash: populate_from_parent #{}: difficulty={} and gas_limit={}", header.number, header.difficulty, header.gas_limit);
|
||||
}
|
||||
|
||||
/// Apply the block reward on finalisation of the block.
|
||||
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
|
||||
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
|
||||
|
||||
fn is_sealer(&self, author: &Address) -> Option<bool> {
|
||||
Some(self.our_params.authorities.contains(author))
|
||||
}
|
||||
|
||||
@@ -184,17 +184,17 @@ pub trait Engine : Sync + Send {
|
||||
self.builtins().get(a).expect("attempted to execute nonexistent builtin").execute(input, output);
|
||||
}
|
||||
|
||||
/// Check if new block should be chosen as the one in chain.
|
||||
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
|
||||
ethash::is_new_best_block(best_total_difficulty, parent_details, new_header)
|
||||
}
|
||||
|
||||
/// Register an account which signs consensus messages.
|
||||
fn set_signer(&self, _address: Address, _password: String) {}
|
||||
|
||||
/// Add a channel for communication with Client which can be used for sealing.
|
||||
fn register_message_channel(&self, _message_channel: IoChannel<ClientIoMessage>) {}
|
||||
|
||||
/// Add an account provider useful for Engines that sign stuff.
|
||||
fn register_account_provider(&self, _account_provider: Arc<AccountProvider>) {}
|
||||
|
||||
/// Register an account which signs consensus messages.
|
||||
fn set_signer(&self, _address: Address, _password: String) {}
|
||||
|
||||
/// Check if new block should be chosen as the one in chain.
|
||||
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
|
||||
ethash::is_new_best_block(best_total_difficulty, parent_details, new_header)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,12 +328,16 @@ impl Engine for Ethash {
|
||||
t.sender().map(|_|()) // Perform EC recovery and cache sender
|
||||
}
|
||||
|
||||
/// Check if new block should be chosen as the one in chain.
|
||||
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
|
||||
is_new_best_block(best_total_difficulty, parent_details, new_header)
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a new block should replace the best blockchain block.
|
||||
pub fn is_new_best_block(best_total_difficulty: U256, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
|
||||
parent_details.total_difficulty + new_header.difficulty() > best_total_difficulty
|
||||
}
|
||||
|
||||
#[cfg_attr(feature="dev", allow(wrong_self_convention))]
|
||||
impl Ethash {
|
||||
fn calculate_difficulty(&self, header: &Header, parent: &Header) -> U256 {
|
||||
@@ -409,12 +413,6 @@ impl Ethash {
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a new block should replace the best blockchain block.
|
||||
pub fn is_new_best_block(best_total_difficulty: U256, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
|
||||
parent_details.total_difficulty + new_header.difficulty() > best_total_difficulty
|
||||
}
|
||||
|
||||
|
||||
impl Header {
|
||||
/// Get the none field of the header.
|
||||
pub fn nonce(&self) -> H64 {
|
||||
|
||||
@@ -737,7 +737,7 @@ impl MinerService for Miner {
|
||||
*self.author.write() = author;
|
||||
}
|
||||
|
||||
fn set_consensus_signer(&self, address: Address, password: String) -> Result<(), AccountError> {
|
||||
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), AccountError> {
|
||||
if self.seals_internally {
|
||||
if let Some(ref ap) = self.accounts {
|
||||
try!(ap.sign(address.clone(), Some(password.clone()), Default::default()));
|
||||
|
||||
@@ -77,7 +77,7 @@ pub trait MinerService : Send + Sync {
|
||||
fn set_author(&self, author: Address);
|
||||
|
||||
/// Set info necessary to sign consensus messages.
|
||||
fn set_consensus_signer(&self, address: Address, password: String) -> Result<(), ::account_provider::Error>;
|
||||
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), ::account_provider::Error>;
|
||||
|
||||
/// Get the extra_data that we will seal blocks with.
|
||||
fn extra_data(&self) -> Bytes;
|
||||
|
||||
@@ -82,6 +82,7 @@ struct Restoration {
|
||||
struct RestorationParams<'a> {
|
||||
manifest: ManifestData, // manifest to base restoration on.
|
||||
pruning: Algorithm, // pruning algorithm for the database.
|
||||
engine: Arc<Engine>, // consensus engine of the chain.
|
||||
db_path: PathBuf, // database path
|
||||
db_config: &'a DatabaseConfig, // configuration for the database.
|
||||
writer: Option<LooseWriter>, // writer for recovered snapshot.
|
||||
@@ -100,7 +101,7 @@ impl Restoration {
|
||||
let raw_db = Arc::new(try!(Database::open(params.db_config, &*params.db_path.to_string_lossy())
|
||||
.map_err(UtilError::SimpleString)));
|
||||
|
||||
let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone(), Spec::new_null().engine);
|
||||
let chain = BlockChain::new(Default::default(), params.genesis, raw_db.clone(), params.engine);
|
||||
let blocks = try!(BlockRebuilder::new(chain, raw_db.clone(), &manifest));
|
||||
|
||||
let root = manifest.state_root.clone();
|
||||
@@ -421,6 +422,7 @@ impl Service {
|
||||
let params = RestorationParams {
|
||||
manifest: manifest,
|
||||
pruning: self.pruning,
|
||||
engine: self.engine.clone(),
|
||||
db_path: self.restoration_db(),
|
||||
db_config: &self.db_config,
|
||||
writer: writer,
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//! A queue of blocks. Sits between network or other I/O and the `BlockChain`.
|
||||
//! Sorts them ready for blockchain insertion.
|
||||
|
||||
use std::thread::{JoinHandle, self};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering as AtomicOrdering};
|
||||
use std::sync::{Condvar as SCondvar, Mutex as SMutex};
|
||||
use util::*;
|
||||
@@ -64,35 +64,11 @@ impl Default for Config {
|
||||
}
|
||||
}
|
||||
|
||||
struct VerifierHandle {
|
||||
deleting: Arc<AtomicBool>,
|
||||
sleep: Arc<AtomicBool>,
|
||||
thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl VerifierHandle {
|
||||
// signal to the verifier thread that it should sleep.
|
||||
fn sleep(&self) {
|
||||
self.sleep.store(true, AtomicOrdering::SeqCst);
|
||||
}
|
||||
|
||||
// signal to the verifier thread that it should wake up.
|
||||
fn wake_up(&self) {
|
||||
self.sleep.store(false, AtomicOrdering::SeqCst);
|
||||
self.thread.thread().unpark();
|
||||
}
|
||||
|
||||
// signal to the verifier thread that it should conclude its
|
||||
// operations.
|
||||
fn conclude(&self) {
|
||||
self.wake_up();
|
||||
self.deleting.store(true, AtomicOrdering::Release);
|
||||
}
|
||||
|
||||
// join the verifier thread.
|
||||
fn join(self) {
|
||||
self.thread.join().expect("Verifier thread panicked");
|
||||
}
|
||||
// pool states
|
||||
enum State {
|
||||
// all threads with id < inner value are to work.
|
||||
Work(usize),
|
||||
Exit,
|
||||
}
|
||||
|
||||
/// An item which is in the process of being verified.
|
||||
@@ -131,7 +107,6 @@ pub struct VerificationQueue<K: Kind> {
|
||||
engine: Arc<Engine>,
|
||||
more_to_verify: Arc<SCondvar>,
|
||||
verification: Arc<Verification<K>>,
|
||||
verifiers: Mutex<(Vec<VerifierHandle>, usize)>,
|
||||
deleting: Arc<AtomicBool>,
|
||||
ready_signal: Arc<QueueSignal>,
|
||||
empty: Arc<SCondvar>,
|
||||
@@ -139,6 +114,8 @@ pub struct VerificationQueue<K: Kind> {
|
||||
ticks_since_adjustment: AtomicUsize,
|
||||
max_queue_size: usize,
|
||||
max_mem_use: usize,
|
||||
verifier_handles: Vec<JoinHandle<()>>,
|
||||
state: Arc<(Mutex<State>, Condvar)>,
|
||||
}
|
||||
|
||||
struct QueueSignal {
|
||||
@@ -224,40 +201,39 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
|
||||
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
|
||||
let default_amount = max(::num_cpus::get(), 3) - 2;
|
||||
let mut verifiers = Vec::with_capacity(max_verifiers);
|
||||
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
|
||||
let mut verifier_handles = Vec::with_capacity(max_verifiers);
|
||||
|
||||
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
|
||||
|
||||
for i in 0..max_verifiers {
|
||||
debug!(target: "verification", "Adding verification thread #{}", i);
|
||||
|
||||
let deleting = deleting.clone();
|
||||
let panic_handler = panic_handler.clone();
|
||||
let verification = verification.clone();
|
||||
let engine = engine.clone();
|
||||
let wait = more_to_verify.clone();
|
||||
let ready = ready_signal.clone();
|
||||
let empty = empty.clone();
|
||||
let state = state.clone();
|
||||
|
||||
// enable only the first few verifiers.
|
||||
let sleep = if i < default_amount {
|
||||
Arc::new(AtomicBool::new(false))
|
||||
} else {
|
||||
Arc::new(AtomicBool::new(true))
|
||||
};
|
||||
|
||||
verifiers.push(VerifierHandle {
|
||||
deleting: deleting.clone(),
|
||||
sleep: sleep.clone(),
|
||||
thread: thread::Builder::new()
|
||||
.name(format!("Verifier #{}", i))
|
||||
.spawn(move || {
|
||||
panic_handler.catch_panic(move || {
|
||||
VerificationQueue::verify(verification, engine, wait, ready, deleting, empty, sleep)
|
||||
}).unwrap()
|
||||
})
|
||||
.expect("Failed to create verifier thread.")
|
||||
});
|
||||
let handle = thread::Builder::new()
|
||||
.name(format!("Verifier #{}", i))
|
||||
.spawn(move || {
|
||||
panic_handler.catch_panic(move || {
|
||||
VerificationQueue::verify(
|
||||
verification,
|
||||
engine,
|
||||
wait,
|
||||
ready,
|
||||
empty,
|
||||
state,
|
||||
i,
|
||||
)
|
||||
}).unwrap()
|
||||
})
|
||||
.expect("Failed to create verifier thread.");
|
||||
verifier_handles.push(handle);
|
||||
}
|
||||
|
||||
VerificationQueue {
|
||||
@@ -266,13 +242,14 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
ready_signal: ready_signal,
|
||||
more_to_verify: more_to_verify,
|
||||
verification: verification,
|
||||
verifiers: Mutex::new((verifiers, default_amount)),
|
||||
deleting: deleting,
|
||||
processing: RwLock::new(HashSet::new()),
|
||||
empty: empty,
|
||||
ticks_since_adjustment: AtomicUsize::new(0),
|
||||
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
|
||||
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
|
||||
verifier_handles: verifier_handles,
|
||||
state: state,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,23 +258,30 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
engine: Arc<Engine>,
|
||||
wait: Arc<SCondvar>,
|
||||
ready: Arc<QueueSignal>,
|
||||
deleting: Arc<AtomicBool>,
|
||||
empty: Arc<SCondvar>,
|
||||
sleep: Arc<AtomicBool>,
|
||||
state: Arc<(Mutex<State>, Condvar)>,
|
||||
id: usize,
|
||||
) {
|
||||
while !deleting.load(AtomicOrdering::Acquire) {
|
||||
loop {
|
||||
// check current state.
|
||||
{
|
||||
while sleep.load(AtomicOrdering::SeqCst) {
|
||||
trace!(target: "verification", "Verifier sleeping");
|
||||
::std::thread::park();
|
||||
trace!(target: "verification", "Verifier waking up");
|
||||
let mut cur_state = state.0.lock();
|
||||
while let State::Work(x) = *cur_state {
|
||||
// sleep until this thread is required.
|
||||
if id < x { break }
|
||||
|
||||
if deleting.load(AtomicOrdering::Acquire) {
|
||||
return;
|
||||
}
|
||||
debug!(target: "verification", "verifier {} sleeping", id);
|
||||
state.1.wait(&mut cur_state);
|
||||
debug!(target: "verification", "verifier {} waking up", id);
|
||||
}
|
||||
|
||||
if let State::Exit = *cur_state {
|
||||
debug!(target: "verification", "verifier {} exiting", id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// wait for work if empty.
|
||||
{
|
||||
let mut more_to_verify = verification.more_to_verify.lock().unwrap();
|
||||
|
||||
@@ -305,15 +289,22 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
empty.notify_all();
|
||||
}
|
||||
|
||||
while verification.unverified.lock().is_empty() && !deleting.load(AtomicOrdering::Acquire) {
|
||||
while verification.unverified.lock().is_empty() {
|
||||
if let State::Exit = *state.0.lock() {
|
||||
debug!(target: "verification", "verifier {} exiting", id);
|
||||
return;
|
||||
}
|
||||
|
||||
more_to_verify = wait.wait(more_to_verify).unwrap();
|
||||
}
|
||||
|
||||
if deleting.load(AtomicOrdering::Acquire) {
|
||||
if let State::Exit = *state.0.lock() {
|
||||
debug!(target: "verification", "verifier {} exiting", id);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// do work.
|
||||
let item = {
|
||||
// acquire these locks before getting the item to verify.
|
||||
let mut unverified = verification.unverified.lock();
|
||||
@@ -568,6 +559,14 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current number of working verifiers.
|
||||
pub fn num_verifiers(&self) -> usize {
|
||||
match *self.state.0.lock() {
|
||||
State::Work(x) => x,
|
||||
State::Exit => panic!("state only set to exit on drop; queue live now; qed"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Optimise memory footprint of the heap fields, and adjust the number of threads
|
||||
/// to better suit the workload.
|
||||
pub fn collect_garbage(&self) {
|
||||
@@ -604,7 +603,7 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
return;
|
||||
}
|
||||
|
||||
let current = self.verifiers.lock().1;
|
||||
let current = self.num_verifiers();
|
||||
|
||||
let diff = (v_len - u_len).abs();
|
||||
let total = v_len + u_len;
|
||||
@@ -626,27 +625,14 @@ impl<K: Kind> VerificationQueue<K> {
|
||||
// possible, never going over the amount of initially allocated threads
|
||||
// or below 1.
|
||||
fn scale_verifiers(&self, target: usize) {
|
||||
let mut verifiers = self.verifiers.lock();
|
||||
let &mut (ref mut verifiers, ref mut verifier_count) = &mut *verifiers;
|
||||
|
||||
let target = min(verifiers.len(), target);
|
||||
let current = self.num_verifiers();
|
||||
let target = min(self.verifier_handles.len(), target);
|
||||
let target = max(1, target);
|
||||
|
||||
debug!(target: "verification", "Scaling from {} to {} verifiers", verifier_count, target);
|
||||
debug!(target: "verification", "Scaling from {} to {} verifiers", current, target);
|
||||
|
||||
// scaling up
|
||||
for i in *verifier_count..target {
|
||||
debug!(target: "verification", "Waking up verifier {}", i);
|
||||
verifiers[i].wake_up();
|
||||
}
|
||||
|
||||
// scaling down.
|
||||
for i in target..*verifier_count {
|
||||
debug!(target: "verification", "Putting verifier {} to sleep", i);
|
||||
verifiers[i].sleep();
|
||||
}
|
||||
|
||||
*verifier_count = target;
|
||||
*self.state.0.lock() = State::Work(target);
|
||||
self.state.1.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -660,22 +646,18 @@ impl<K: Kind> Drop for VerificationQueue<K> {
|
||||
fn drop(&mut self) {
|
||||
trace!(target: "shutdown", "[VerificationQueue] Closing...");
|
||||
self.clear();
|
||||
self.deleting.store(true, AtomicOrdering::Release);
|
||||
self.deleting.store(true, AtomicOrdering::SeqCst);
|
||||
|
||||
let mut verifiers = self.verifiers.get_mut();
|
||||
let mut verifiers = &mut verifiers.0;
|
||||
|
||||
// first pass to signal conclusion. must be done before
|
||||
// notify or deadlock possible.
|
||||
for handle in verifiers.iter() {
|
||||
handle.conclude();
|
||||
}
|
||||
// set exit state; should be done before `more_to_verify` notification.
|
||||
*self.state.0.lock() = State::Exit;
|
||||
self.state.1.notify_all();
|
||||
|
||||
// wake up all threads waiting for more work.
|
||||
self.more_to_verify.notify_all();
|
||||
|
||||
// second pass to join.
|
||||
for handle in verifiers.drain(..) {
|
||||
handle.join();
|
||||
// wait for all verifier threads to join.
|
||||
for thread in self.verifier_handles.drain(..) {
|
||||
thread.join().expect("Propagating verifier thread panic on shutdown");
|
||||
}
|
||||
|
||||
trace!(target: "shutdown", "[VerificationQueue] Closed.");
|
||||
@@ -687,7 +669,7 @@ mod tests {
|
||||
use util::*;
|
||||
use io::*;
|
||||
use spec::*;
|
||||
use super::{BlockQueue, Config};
|
||||
use super::{BlockQueue, Config, State};
|
||||
use super::kind::blocks::Unverified;
|
||||
use tests::helpers::*;
|
||||
use error::*;
|
||||
@@ -784,11 +766,11 @@ mod tests {
|
||||
let queue = get_test_queue();
|
||||
queue.scale_verifiers(MAX_VERIFIERS + 1);
|
||||
|
||||
assert!(queue.verifiers.lock().1 < MAX_VERIFIERS + 1);
|
||||
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
|
||||
|
||||
queue.scale_verifiers(0);
|
||||
|
||||
assert!(queue.verifiers.lock().1 == 1);
|
||||
assert!(queue.num_verifiers() == 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -797,14 +779,7 @@ mod tests {
|
||||
|
||||
// put all the verifiers to sleep to ensure
|
||||
// the test isn't timing sensitive.
|
||||
let num_verifiers = {
|
||||
let verifiers = queue.verifiers.lock();
|
||||
for i in 0..verifiers.1 {
|
||||
verifiers.0[i].sleep();
|
||||
}
|
||||
|
||||
verifiers.1
|
||||
};
|
||||
*queue.state.0.lock() = State::Work(0);
|
||||
|
||||
for block in get_good_dummy_block_seq(5000) {
|
||||
queue.import(Unverified::new(block)).expect("Block good by definition; qed");
|
||||
@@ -812,20 +787,12 @@ mod tests {
|
||||
|
||||
// almost all unverified == bump verifier count.
|
||||
queue.collect_garbage();
|
||||
assert_eq!(queue.verifiers.lock().1, num_verifiers + 1);
|
||||
|
||||
// wake them up again and verify everything.
|
||||
{
|
||||
let verifiers = queue.verifiers.lock();
|
||||
for i in 0..verifiers.1 {
|
||||
verifiers.0[i].wake_up();
|
||||
}
|
||||
}
|
||||
assert_eq!(queue.num_verifiers(), 1);
|
||||
|
||||
queue.flush();
|
||||
|
||||
// nothing to verify == use minimum number of verifiers.
|
||||
queue.collect_garbage();
|
||||
assert_eq!(queue.verifiers.lock().1, 1);
|
||||
assert_eq!(queue.num_verifiers(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user