Merge branch 'master' into lightserv

This commit is contained in:
Robert Habermeier
2016-12-09 01:29:46 +01:00
132 changed files with 2594 additions and 767 deletions

View File

@@ -194,6 +194,11 @@ impl AccountProvider {
Ok(self.address_book.write().set_meta(account, meta))
}
/// Removes and address from the addressbook
pub fn remove_address(&self, addr: Address) -> Result<(), Error> {
Ok(self.address_book.write().remove(addr))
}
/// Returns each account along with name and meta.
pub fn accounts_info(&self) -> Result<HashMap<Address, AccountMeta>, Error> {
let r: HashMap<Address, AccountMeta> = try!(self.sstore.accounts())

View File

@@ -74,6 +74,12 @@ impl AddressBook {
}
self.save();
}
/// Removes an entry
pub fn remove(&mut self, a: Address) {
self.cache.remove(&a);
self.save();
}
}
/// Dapps user settings
@@ -244,4 +250,22 @@ mod tests {
}
]);
}
#[test]
fn should_remove_address() {
let temp = RandomTempPath::create_dir();
let path = temp.as_str().to_owned();
let mut b = AddressBook::new(path.clone());
b.set_name(1.into(), "One".to_owned());
b.set_name(2.into(), "Two".to_owned());
b.set_name(3.into(), "Three".to_owned());
b.remove(2.into());
let b = AddressBook::new(path);
assert_eq!(b.get(), hash_map![
1.into() => AccountMeta{name: "One".to_owned(), meta: "{}".to_owned(), uuid: None},
3.into() => AccountMeta{name: "Three".to_owned(), meta: "{}".to_owned(), uuid: None}
]);
}
}

View File

@@ -21,13 +21,15 @@ use std::sync::Weak;
use std::time::{UNIX_EPOCH, Duration};
use util::*;
use ethkey::{verify_address, Signature};
use rlp::{Rlp, UntrustedRlp, View, encode};
use rlp::{UntrustedRlp, Rlp, View, encode};
use account_provider::AccountProvider;
use block::*;
use spec::CommonParams;
use engines::Engine;
use header::Header;
use error::{Error, BlockError};
use blockchain::extras::BlockDetails;
use views::HeaderView;
use evm::Schedule;
use ethjson;
use io::{IoContext, IoHandler, TimerToken, IoService, IoChannel};
@@ -35,8 +37,6 @@ use service::ClientIoMessage;
use transaction::SignedTransaction;
use env_info::EnvInfo;
use builtin::Builtin;
use blockchain::extras::BlockDetails;
use views::HeaderView;
/// `AuthorityRound` params.
#[derive(Debug, PartialEq)]
@@ -68,18 +68,20 @@ pub struct AuthorityRound {
params: CommonParams,
our_params: AuthorityRoundParams,
builtins: BTreeMap<Address, Builtin>,
transition_service: IoService<BlockArrived>,
transition_service: IoService<()>,
message_channel: Mutex<Option<IoChannel<ClientIoMessage>>>,
step: AtomicUsize,
proposed: AtomicBool,
account_provider: Mutex<Option<Arc<AccountProvider>>>,
password: RwLock<Option<String>>,
}
fn header_step(header: &Header) -> Result<usize, ::rlp::DecoderError> {
UntrustedRlp::new(&header.seal()[0]).as_val()
UntrustedRlp::new(&header.seal().get(0).expect("was either checked with verify_block_basic or is genesis; has 2 fields; qed (Make sure the spec file has a correct genesis seal)")).as_val()
}
fn header_signature(header: &Header) -> Result<Signature, ::rlp::DecoderError> {
UntrustedRlp::new(&header.seal()[1]).as_val::<H520>().map(Into::into)
UntrustedRlp::new(&header.seal().get(1).expect("was checked with verify_block_basic; has 2 fields; qed")).as_val::<H520>().map(Into::into)
}
trait AsMillis {
@@ -101,10 +103,12 @@ impl AuthorityRound {
params: params,
our_params: our_params,
builtins: builtins,
transition_service: try!(IoService::<BlockArrived>::start()),
transition_service: try!(IoService::<()>::start()),
message_channel: Mutex::new(None),
step: AtomicUsize::new(initial_step),
proposed: AtomicBool::new(false)
proposed: AtomicBool::new(false),
account_provider: Mutex::new(None),
password: RwLock::new(None),
});
let handler = TransitionHandler { engine: Arc::downgrade(&engine) };
try!(engine.transition_service.register_handler(Arc::new(handler)));
@@ -143,20 +147,17 @@ struct TransitionHandler {
engine: Weak<AuthorityRound>,
}
#[derive(Clone)]
struct BlockArrived;
const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<BlockArrived> for TransitionHandler {
fn initialize(&self, io: &IoContext<BlockArrived>) {
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, engine.remaining_step_duration().as_millis())
.unwrap_or_else(|e| warn!(target: "poa", "Failed to start consensus step timer: {}.", e))
}
}
fn timeout(&self, io: &IoContext<BlockArrived>, timer: TimerToken) {
fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
engine.step.fetch_add(1, AtomicOrdering::SeqCst);
@@ -208,10 +209,6 @@ impl Engine for AuthorityRound {
});
}
/// Apply the block reward on finalisation of the block.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
fn is_sealer(&self, author: &Address) -> Option<bool> {
let p = &self.our_params;
Some(p.authorities.contains(author))
@@ -221,14 +218,14 @@ impl Engine for AuthorityRound {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
if self.proposed.load(AtomicOrdering::SeqCst) { return None; }
let header = block.header();
let step = self.step();
if self.is_step_proposer(step, header.author()) {
if let Some(ap) = accounts {
if let Some(ref ap) = *self.account_provider.lock() {
// Account should be permanently unlocked, otherwise sealing will fail.
if let Ok(signature) = ap.sign(*header.author(), None, header.bare_hash()) {
if let Ok(signature) = ap.sign(*header.author(), self.password.read().clone(), header.bare_hash()) {
trace!(target: "poa", "generate_seal: Issuing a block for step {}.", step);
self.proposed.store(true, AtomicOrdering::SeqCst);
return Some(vec![encode(&step).to_vec(), encode(&(&*signature as &[u8])).to_vec()]);
@@ -303,23 +300,30 @@ impl Engine for AuthorityRound {
t.sender().map(|_|()) // Perform EC recovery and cache sender
}
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
let mut guard = self.message_channel.lock();
*guard = Some(message_channel);
}
fn is_new_best_block(&self, _best_total_difficulty: U256, best_header: HeaderView, _parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
let new_number = new_header.number();
let best_number = best_header.number();
if new_number != best_number {
new_number > best_number
} else {
// Take the oldest step at given height.
let new_step: usize = Rlp::new(&new_header.seal()[0]).as_val();
// Take the oldest step at given height.
let new_step: usize = Rlp::new(&new_header.seal()[0]).as_val();
let best_step: usize = Rlp::new(&best_header.seal()[0]).as_val();
new_step < best_step
}
}
fn register_message_channel(&self, message_channel: IoChannel<ClientIoMessage>) {
*self.message_channel.lock() = Some(message_channel);
}
fn set_signer(&self, _address: Address, password: String) {
*self.password.write() = Some(password);
}
fn register_account_provider(&self, account_provider: Arc<AccountProvider>) {
*self.account_provider.lock() = Some(account_provider);
}
}
#[cfg(test)]
@@ -387,12 +391,11 @@ mod tests {
fn generates_seal_and_does_not_double_propose() {
let tap = AccountProvider::transient_provider();
let addr1 = tap.insert_account("1".sha3(), "1").unwrap();
tap.unlock_account_permanently(addr1, "1".into()).unwrap();
let addr2 = tap.insert_account("2".sha3(), "2").unwrap();
tap.unlock_account_permanently(addr2, "2".into()).unwrap();
let spec = Spec::new_test_round();
let engine = &*spec.engine;
engine.register_account_provider(Arc::new(tap));
let genesis_header = spec.genesis_header();
let mut db1 = get_temp_state_db().take();
spec.ensure_db_good(&mut db1, &TrieFactory::new(TrieSpec::Secure)).unwrap();
@@ -404,16 +407,18 @@ mod tests {
let b2 = OpenBlock::new(engine, Default::default(), false, db2, &genesis_header, last_hashes, addr2, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b2 = b2.close_and_lock();
if let Some(seal) = engine.generate_seal(b1.block(), Some(&tap)) {
engine.set_signer(addr1, "1".into());
if let Some(seal) = engine.generate_seal(b1.block()) {
assert!(b1.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b1.block(), Some(&tap)).is_none());
assert!(engine.generate_seal(b1.block()).is_none());
}
if let Some(seal) = engine.generate_seal(b2.block(), Some(&tap)) {
engine.set_signer(addr2, "2".into());
if let Some(seal) = engine.generate_seal(b2.block()) {
assert!(b2.clone().try_seal(engine, seal).is_ok());
// Second proposal is forbidden.
assert!(engine.generate_seal(b2.block(), Some(&tap)).is_none());
assert!(engine.generate_seal(b2.block()).is_none());
}
}

View File

@@ -58,6 +58,8 @@ pub struct BasicAuthority {
params: CommonParams,
our_params: BasicAuthorityParams,
builtins: BTreeMap<Address, Builtin>,
account_provider: Mutex<Option<Arc<AccountProvider>>>,
password: RwLock<Option<String>>,
}
impl BasicAuthority {
@@ -67,6 +69,8 @@ impl BasicAuthority {
params: params,
our_params: our_params,
builtins: builtins,
account_provider: Mutex::new(None),
password: RwLock::new(None),
}
}
}
@@ -98,13 +102,8 @@ impl Engine for BasicAuthority {
max(gas_floor_target, gas_limit - gas_limit / bound_divisor + 1.into())
}
});
// info!("ethash: populate_from_parent #{}: difficulty={} and gas_limit={}", header.number, header.difficulty, header.gas_limit);
}
/// Apply the block reward on finalisation of the block.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
fn is_sealer(&self, author: &Address) -> Option<bool> {
Some(self.our_params.authorities.contains(author))
}
@@ -113,12 +112,12 @@ impl Engine for BasicAuthority {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which `false` will
/// be returned.
fn generate_seal(&self, block: &ExecutedBlock, accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
if let Some(ap) = accounts {
fn generate_seal(&self, block: &ExecutedBlock) -> Option<Vec<Bytes>> {
if let Some(ref ap) = *self.account_provider.lock() {
let header = block.header();
let message = header.bare_hash();
// account should be pernamently unlocked, otherwise sealing will fail
if let Ok(signature) = ap.sign(*block.header().author(), None, message) {
if let Ok(signature) = ap.sign(*block.header().author(), self.password.read().clone(), message) {
return Some(vec![::rlp::encode(&(&*signature as &[u8])).to_vec()]);
} else {
trace!(target: "basicauthority", "generate_seal: FAIL: accounts secret key unavailable");
@@ -179,6 +178,14 @@ impl Engine for BasicAuthority {
fn verify_transaction(&self, t: &SignedTransaction, _header: &Header) -> Result<(), Error> {
t.sender().map(|_|()) // Perform EC recovery and cache sender
}
fn set_signer(&self, _address: Address, password: String) {
*self.password.write() = Some(password);
}
fn register_account_provider(&self, ap: Arc<AccountProvider>) {
*self.account_provider.lock() = Some(ap);
}
}
#[cfg(test)]
@@ -250,10 +257,11 @@ mod tests {
fn can_generate_seal() {
let tap = AccountProvider::transient_provider();
let addr = tap.insert_account("".sha3(), "").unwrap();
tap.unlock_account_permanently(addr, "".into()).unwrap();
let spec = new_test_authority();
let engine = &*spec.engine;
engine.set_signer(addr, "".into());
engine.register_account_provider(Arc::new(tap));
let genesis_header = spec.genesis_header();
let mut db_result = get_temp_state_db();
let mut db = db_result.take();
@@ -261,7 +269,7 @@ mod tests {
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, addr, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
let seal = engine.generate_seal(b.block(), Some(&tap)).unwrap();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
}

View File

@@ -23,7 +23,6 @@ use spec::CommonParams;
use evm::Schedule;
use block::ExecutedBlock;
use util::Bytes;
use account_provider::AccountProvider;
/// An engine which does not provide any consensus mechanism, just seals blocks internally.
pub struct InstantSeal {
@@ -60,7 +59,7 @@ impl Engine for InstantSeal {
fn is_sealer(&self, _author: &Address) -> Option<bool> { Some(true) }
fn generate_seal(&self, _block: &ExecutedBlock, _accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> {
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> {
Some(Vec::new())
}
}
@@ -70,16 +69,12 @@ mod tests {
use util::*;
use util::trie::TrieSpec;
use tests::helpers::*;
use account_provider::AccountProvider;
use spec::Spec;
use header::Header;
use block::*;
#[test]
fn instant_can_seal() {
let tap = AccountProvider::transient_provider();
let addr = tap.insert_account("".sha3(), "").unwrap();
let spec = Spec::new_instant();
let engine = &*spec.engine;
let genesis_header = spec.genesis_header();
@@ -87,10 +82,9 @@ mod tests {
let mut db = db_result.take();
spec.ensure_db_good(&mut db, &TrieFactory::new(TrieSpec::Secure)).unwrap();
let last_hashes = Arc::new(vec![genesis_header.hash()]);
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, addr, (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = OpenBlock::new(engine, Default::default(), false, db, &genesis_header, last_hashes, Address::default(), (3141562.into(), 31415620.into()), vec![]).unwrap();
let b = b.close_and_lock();
// Seal with empty AccountProvider.
let seal = engine.generate_seal(b.block(), Some(&tap)).unwrap();
let seal = engine.generate_seal(b.block()).unwrap();
assert!(b.try_seal(engine, seal).is_ok());
}

View File

@@ -94,7 +94,7 @@ pub trait Engine : Sync + Send {
///
/// This operation is synchronous and may (quite reasonably) not be available, in which None will
/// be returned.
fn generate_seal(&self, _block: &ExecutedBlock, _accounts: Option<&AccountProvider>) -> Option<Vec<Bytes>> { None }
fn generate_seal(&self, _block: &ExecutedBlock) -> Option<Vec<Bytes>> { None }
/// Phase 1 quick block verification. Only does checks that are cheap. `block` (the header's full block)
/// may be provided for additional checks. Returns either a null `Ok` or a general error detailing the problem with import.
@@ -147,11 +147,17 @@ pub trait Engine : Sync + Send {
self.builtins().get(a).expect("attempted to execute nonexistent builtin").execute(input, output);
}
/// Add a channel for communication with Client which can be used for sealing.
fn register_message_channel(&self, _message_channel: IoChannel<ClientIoMessage>) {}
/// Check if new block should be chosen as the one in chain.
fn is_new_best_block(&self, best_total_difficulty: U256, _best_header: HeaderView, parent_details: &BlockDetails, new_header: &HeaderView) -> bool {
ethash::is_new_best_block(best_total_difficulty, parent_details, new_header)
}
/// Register an account which signs consensus messages.
fn set_signer(&self, _address: Address, _password: String) {}
/// Add a channel for communication with Client which can be used for sealing.
fn register_message_channel(&self, _message_channel: IoChannel<ClientIoMessage>) {}
/// Add an account provider useful for Engines that sign stuff.
fn register_account_provider(&self, _account_provider: Arc<AccountProvider>) {}
}

View File

@@ -19,7 +19,7 @@ use std::time::{Instant, Duration};
use util::*;
use util::using_queue::{UsingQueue, GetAction};
use account_provider::AccountProvider;
use account_provider::{AccountProvider, Error as AccountError};
use views::{BlockView, HeaderView};
use header::Header;
use state::{State, CleanupMode};
@@ -464,15 +464,12 @@ impl Miner {
/// Attempts to perform internal sealing (one that does not require work) to return Ok(sealed),
/// Err(Some(block)) returns for unsuccesful sealing while Err(None) indicates misspecified engine.
fn seal_block_internally(&self, block: ClosedBlock) -> Result<SealedBlock, Option<ClosedBlock>> {
trace!(target: "miner", "seal_block_internally: block has transaction - attempting internal seal.");
let s = self.engine.generate_seal(block.block(), match self.accounts {
Some(ref x) => Some(&**x),
None => None,
});
trace!(target: "miner", "seal_block_internally: attempting internal seal.");
let s = self.engine.generate_seal(block.block());
if let Some(seal) = s {
trace!(target: "miner", "seal_block_internally: managed internal seal. importing...");
block.lock().try_seal(&*self.engine, seal).or_else(|_| {
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal. WTF?");
block.lock().try_seal(&*self.engine, seal).or_else(|(e, _)| {
warn!("prepare_sealing: ERROR: try_seal failed when given internally generated seal: {}", e);
Err(None)
})
} else {
@@ -485,7 +482,7 @@ impl Miner {
fn seal_and_import_block_internally(&self, chain: &MiningBlockChainClient, block: ClosedBlock) -> bool {
if !block.transactions().is_empty() || self.forced_sealing() {
if let Ok(sealed) = self.seal_block_internally(block) {
if chain.import_block(sealed.rlp_bytes()).is_ok() {
if chain.import_sealed_block(sealed).is_ok() {
trace!(target: "miner", "import_block_internally: imported internally sealed block");
return true
}
@@ -740,6 +737,19 @@ impl MinerService for Miner {
*self.author.write() = author;
}
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), AccountError> {
if self.seals_internally {
if let Some(ref ap) = self.accounts {
try!(ap.sign(address.clone(), Some(password.clone()), Default::default()));
}
let mut sealing_work = self.sealing_work.lock();
sealing_work.enabled = self.engine.is_sealer(&address).unwrap_or(false);
*self.author.write() = address;
self.engine.set_signer(address, password);
}
Ok(())
}
fn set_extra_data(&self, extra_data: Bytes) {
*self.extra_data.write() = extra_data;
}
@@ -1042,7 +1052,7 @@ impl MinerService for Miner {
ret.map(f)
}
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
fn submit_seal(&self, chain: &MiningBlockChainClient, block_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
let result =
if let Some(b) = self.sealing_work.lock().queue.get_used_if(
if self.options.enable_resubmission {
@@ -1050,22 +1060,22 @@ impl MinerService for Miner {
} else {
GetAction::Take
},
|b| &b.hash() == &pow_hash
|b| &b.hash() == &block_hash
) {
trace!(target: "miner", "Sealing block {}={}={} with seal {:?}", pow_hash, b.hash(), b.header().bare_hash(), seal);
trace!(target: "miner", "Submitted block {}={}={} with seal {:?}", block_hash, b.hash(), b.header().bare_hash(), seal);
b.lock().try_seal(&*self.engine, seal).or_else(|(e, _)| {
warn!(target: "miner", "Mined solution rejected: {}", e);
Err(Error::PowInvalid)
})
} else {
warn!(target: "miner", "Mined solution rejected: Block unknown or out of date.");
warn!(target: "miner", "Submitted solution rejected: Block unknown or out of date.");
Err(Error::PowHashInvalid)
};
result.and_then(|sealed| {
let n = sealed.header().number();
let h = sealed.header().hash();
try!(chain.import_sealed_block(sealed));
info!(target: "miner", "Mined block imported OK. #{}: {}", Colour::White.bold().paint(format!("{}", n)), Colour::White.bold().paint(h.hex()));
info!(target: "miner", "Submitted block imported OK. #{}: {}", Colour::White.bold().paint(format!("{}", n)), Colour::White.bold().paint(h.hex()));
Ok(())
})
}

View File

@@ -76,6 +76,9 @@ pub trait MinerService : Send + Sync {
/// Set the author that we will seal blocks as.
fn set_author(&self, author: Address);
/// Set info necessary to sign consensus messages.
fn set_engine_signer(&self, address: Address, password: String) -> Result<(), ::account_provider::Error>;
/// Get the extra_data that we will seal blocks with.
fn extra_data(&self) -> Bytes;

View File

@@ -304,6 +304,9 @@ impl SignedTransaction {
/// 0 if `v` would have been 27 under "Electrum" notation, 1 if 28 or 4 if invalid.
pub fn standard_v(&self) -> u8 { match self.v { v if v == 27 || v == 28 || v > 36 => ((v - 1) % 2) as u8, _ => 4 } }
/// The `v` value that appears in the RLP.
pub fn original_v(&self) -> u64 { self.v }
/// The network ID, or `None` if this is a global transaction.
pub fn network_id(&self) -> Option<u64> {
match self.v {

View File

@@ -53,6 +53,8 @@ pub struct Config {
/// Maximum heap memory to use.
/// When the limit is reached, is_full returns true.
pub max_mem_use: usize,
/// Settings for the number of verifiers and adaptation strategy.
pub verifier_settings: VerifierSettings,
}
impl Default for Config {
@@ -60,6 +62,26 @@ impl Default for Config {
Config {
max_queue_size: 30000,
max_mem_use: 50 * 1024 * 1024,
verifier_settings: VerifierSettings::default(),
}
}
}
/// Verifier settings.
#[derive(Debug, PartialEq, Clone)]
pub struct VerifierSettings {
/// Whether to scale amount of verifiers according to load.
// Todo: replace w/ strategy enum?
pub scale_verifiers: bool,
/// Beginning amount of verifiers.
pub num_verifiers: usize,
}
impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
}
}
}
@@ -114,6 +136,7 @@ pub struct VerificationQueue<K: Kind> {
ticks_since_adjustment: AtomicUsize,
max_queue_size: usize,
max_mem_use: usize,
scale_verifiers: bool,
verifier_handles: Vec<JoinHandle<()>>,
state: Arc<(Mutex<State>, Condvar)>,
}
@@ -198,13 +221,16 @@ impl<K: Kind> VerificationQueue<K> {
});
let empty = Arc::new(SCondvar::new());
let panic_handler = PanicHandler::new_in_arc();
let scale_verifiers = config.verifier_settings.scale_verifiers;
let max_verifiers = min(::num_cpus::get(), MAX_VERIFIERS);
let default_amount = max(::num_cpus::get(), 3) - 2;
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let num_cpus = ::num_cpus::get();
let max_verifiers = min(num_cpus, MAX_VERIFIERS);
let default_amount = max(1, min(max_verifiers, config.verifier_settings.num_verifiers));
let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });
for i in 0..max_verifiers {
debug!(target: "verification", "Adding verification thread #{}", i);
@@ -248,6 +274,7 @@ impl<K: Kind> VerificationQueue<K> {
ticks_since_adjustment: AtomicUsize::new(0),
max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT),
max_mem_use: max(config.max_mem_use, MIN_MEM_LIMIT),
scale_verifiers: scale_verifiers,
verifier_handles: verifier_handles,
state: state,
}
@@ -597,6 +624,8 @@ impl<K: Kind> VerificationQueue<K> {
self.processing.write().shrink_to_fit();
if !self.scale_verifiers { return }
if self.ticks_since_adjustment.fetch_add(1, AtomicOrdering::SeqCst) + 1 >= READJUSTMENT_PERIOD {
self.ticks_since_adjustment.store(0, AtomicOrdering::SeqCst);
} else {
@@ -675,10 +704,15 @@ mod tests {
use error::*;
use views::*;
fn get_test_queue() -> BlockQueue {
// create a test block queue.
// auto_scaling enables verifier adjustment.
fn get_test_queue(auto_scale: bool) -> BlockQueue {
let spec = get_test_spec();
let engine = spec.engine;
BlockQueue::new(Config::default(), engine, IoChannel::disconnected(), true)
let mut config = Config::default();
config.verifier_settings.scale_verifiers = auto_scale;
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
}
#[test]
@@ -691,7 +725,7 @@ mod tests {
#[test]
fn can_import_blocks() {
let queue = get_test_queue();
let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e);
}
@@ -699,7 +733,7 @@ mod tests {
#[test]
fn returns_error_for_duplicates() {
let queue = get_test_queue();
let queue = get_test_queue(false);
if let Err(e) = queue.import(Unverified::new(get_good_dummy_block())) {
panic!("error importing block that is valid by definition({:?})", e);
}
@@ -718,7 +752,7 @@ mod tests {
#[test]
fn returns_ok_for_drained_duplicates() {
let queue = get_test_queue();
let queue = get_test_queue(false);
let block = get_good_dummy_block();
let hash = BlockView::new(&block).header().hash().clone();
if let Err(e) = queue.import(Unverified::new(block)) {
@@ -735,7 +769,7 @@ mod tests {
#[test]
fn returns_empty_once_finished() {
let queue = get_test_queue();
let queue = get_test_queue(false);
queue.import(Unverified::new(get_good_dummy_block()))
.expect("error importing block that is valid by definition");
queue.flush();
@@ -763,7 +797,7 @@ mod tests {
fn scaling_limits() {
use super::MAX_VERIFIERS;
let queue = get_test_queue();
let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
@@ -775,7 +809,7 @@ mod tests {
#[test]
fn readjust_verifiers() {
let queue = get_test_queue();
let queue = get_test_queue(true);
// put all the verifiers to sleep to ensure
// the test isn't timing sensitive.