[beta] Backports (#8916)

* `duration_ns: u64 -> duration: Duration` (#8457)

* duration_ns: u64 -> duration: Duration

* format on millis {:.2} -> {}

* Keep all enacted blocks notify in order (#8524)

* Keep all enacted blocks notify in order

* Collect is unnecessary

* Update ChainNotify to use ChainRouteType

* Fix all ethcore fn defs

* Wrap the type within ChainRoute

* Fix private-tx and sync api

* Fix secret_store API

* Fix updater API

* Fix rpc api

* Fix informant api

* Eagerly cache enacted/retracted and remove contain_enacted/retracted

* Fix indent

* tests: should use full expr form for struct constructor

* Use into_enacted_retracted to further avoid copy

* typo: not a function

* rpc/tests: ChainRoute -> ChainRoute::new

* Handle removed logs in filter changes and add geth compatibility field (#8796)

* Add removed geth compatibility field in log

* Fix mocked tests

* Add field block hash in PollFilter

* Store last block hash info for log filters

* Implement canon route

* Use canon logs for fetching reorg logs

Light client removed logs fetching is disabled. It looks expensive.

* Make sure removed flag is set

* Address grumbles

* Fixed AuthorityRound deadlock on shutdown, closes #8088 (#8803)

* CI: Fix docker tags (#8822)

* scripts: enable docker builds for beta and stable

* scripts: docker latest should be beta not master

* scripts: docker latest is master

* ethcore: fix ancient block error msg handling (#8832)

* Disable parallel verification and skip verifiying already imported txs. (#8834)

* Reject transactions that are already in pool without verifying them.

* Avoid verifying already imported transactions.

* Fix concurrent access to signer queue (#8854)

* Fix concurrent access to signer queue

* Put request back to the queue if confirmation failed

* typo: fix docs and rename functions to be more specific

`request_notify` does not need to be public, and it's renamed to `notify_result`.
`notify` is renamed to `notify_message`.

* Change trace info "Transaction" -> "Request"

* Don't allocate in expect_valid_rlp unless necessary (#8867)

* don't allocate via format! in case there's no error

* fix test?

* fixed ipc leak, closes #8774 (#8876)

* Add new ovh bootnodes and fix port for foundation bootnode 3.2 (#8886)

* Add new ovh bootnodes and fix port for foundation bootnode 3.2

* Remove old bootnodes.

* Remove duplicate 1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082

* Block 0 is valid in queries (#8891)

Early exit for block nr 0 leads to spurious error about pruning: `…your node is running with state pruning…`.

Fixes #7547, #8762

* Add ETC Cooperative-run load balanced parity node (#8892)

* Minor fix in chain supplier and light provider (#8906)

* fix chain supplier increment

* fix light provider block_headers

* Check whether we need resealing in miner and unwrap has_account in account_provider (#8853)

* Remove unused Result wrap in has_account

* Check whether we need to reseal for external transactions

* Fix reference to has_account interface

* typo: missing )

* Refactor duplicates to prepare_and_update_sealing

* Fix build

* Allow disabling local-by-default for transactions with new config entry (#8882)

* Add tx_queue_allow_unknown_local config option

- Previous commit messages:

dispatcher checks if we have the sender account

Add `tx_queue_allow_unknown_local` to MinerOptions

Add `tx_queue_allow_unknown_local` to config

fix order in MinerOptions to match Configuration

add cli flag for tx_queue_allow_unknown_local

Update refs to `tx_queue_allow_unknown_local`

Add tx_queue_allow_unknown_local to config test

revert changes to dispatcher

Move tx_queue_allow_unknown_local to `import_own_transaction`

Fix var name

if statement should return the values

derp de derp derp derp semicolons

Reset dispatch file to how it was before

fix compile issues + change from FLAG to ARG

add test and use `into`

import MinerOptions, clone the secret

Fix tests?

Compiler/linter issues fixed

Fix linter msg - case of constants

IT LIVES

refactor to omit yucky explict return

update comments

Fix based on diff AccountProvider.has_account method

* Refactor flag name + don't change import_own_tx behaviour

fix arg name

Note: force commit to try and get gitlab tests working again 😠

* Add fn to TestMinerService

* Avoid race condition from trusted sources

- refactor the miner tests a bit to cut down on code reuse
- add `trusted` param to dispatch_transaction and import_claimed_local_transaction

Add param to `import_claimed_local_transaction`

Fix fn sig in tests
This commit is contained in:
André Silva
2018-06-19 09:41:14 +01:00
committed by Afri Schoedon
parent f26a7fe6fa
commit cc44ae9cb5
52 changed files with 659 additions and 323 deletions

View File

@@ -272,8 +272,8 @@ impl AccountProvider {
}
/// Checks whether an account with a given address is present.
pub fn has_account(&self, address: Address) -> Result<bool, Error> {
Ok(self.sstore.account_ref(&address).is_ok() && !self.blacklisted_accounts.contains(&address))
pub fn has_account(&self, address: Address) -> bool {
self.sstore.account_ref(&address).is_ok() && !self.blacklisted_accounts.contains(&address)
}
/// Returns addresses of all accounts.

View File

@@ -20,7 +20,7 @@ use ethereum_types::H256;
use blockchain::block_info::{BlockInfo, BlockLocation};
/// Import route for newly inserted block.
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub struct ImportRoute {
/// Blocks that were invalidated by new block.
pub retracted: Vec<H256>,

View File

@@ -17,6 +17,9 @@
use bytes::Bytes;
use ethereum_types::H256;
use transaction::UnverifiedTransaction;
use blockchain::ImportRoute;
use std::time::Duration;
use std::collections::HashMap;
/// Messages to broadcast via chain
pub enum ChainMessageType {
@@ -28,6 +31,89 @@ pub enum ChainMessageType {
SignedPrivateTransaction(Vec<u8>),
}
/// Route type to indicate whether it is enacted or retracted.
#[derive(Clone)]
pub enum ChainRouteType {
/// Enacted block
Enacted,
/// Retracted block
Retracted
}
/// A complete chain enacted retracted route.
#[derive(Default, Clone)]
pub struct ChainRoute {
route: Vec<(H256, ChainRouteType)>,
enacted: Vec<H256>,
retracted: Vec<H256>,
}
impl<'a> From<&'a [ImportRoute]> for ChainRoute {
fn from(import_results: &'a [ImportRoute]) -> ChainRoute {
ChainRoute::new(import_results.iter().flat_map(|route| {
route.retracted.iter().map(|h| (*h, ChainRouteType::Retracted))
.chain(route.enacted.iter().map(|h| (*h, ChainRouteType::Enacted)))
}).collect())
}
}
impl ChainRoute {
/// Create a new ChainRoute based on block hash and route type pairs.
pub fn new(route: Vec<(H256, ChainRouteType)>) -> Self {
let (enacted, retracted) = Self::to_enacted_retracted(&route);
Self { route, enacted, retracted }
}
/// Gather all non-duplicate enacted and retracted blocks.
fn to_enacted_retracted(route: &[(H256, ChainRouteType)]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = route.iter().fold(HashMap::new(), |mut map, route| {
match &route.1 {
&ChainRouteType::Enacted => {
map.insert(route.0, true);
},
&ChainRouteType::Retracted => {
map.insert(route.0, false);
},
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// Consume route and return the enacted retracted form.
pub fn into_enacted_retracted(self) -> (Vec<H256>, Vec<H256>) {
(self.enacted, self.retracted)
}
/// All non-duplicate enacted blocks.
pub fn enacted(&self) -> &[H256] {
&self.enacted
}
/// All non-duplicate retracted blocks.
pub fn retracted(&self) -> &[H256] {
&self.retracted
}
/// All blocks in the route.
pub fn route(&self) -> &[(H256, ChainRouteType)] {
&self.route
}
}
/// Represents what has to be handled by actor listening to chain events
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks.
@@ -35,12 +121,11 @@ pub trait ChainNotify : Send + Sync {
&self,
_imported: Vec<H256>,
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_route: ChainRoute,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
_duration: u64,
_duration: Duration,
) {
// does nothing by default
}

View File

@@ -14,12 +14,12 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque};
use std::collections::{HashSet, BTreeMap, BTreeSet, VecDeque};
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant};
use std::time::{Instant, Duration};
// util
use hash::keccak;
@@ -33,7 +33,7 @@ use util_error::UtilError;
// other
use ethereum_types::{H256, Address, U256};
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute, TransactionAddress};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute, TransactionAddress};
use client::ancient_import::AncientVerifier;
use client::Error as ClientError;
use client::{
@@ -46,8 +46,8 @@ use client::{
use client::{
BlockId, TransactionId, UncleId, TraceId, ClientConfig, BlockChainClient,
TraceFilter, CallAnalytics, BlockImportError, Mode,
ChainNotify, PruningInfo, ProvingBlockChainClient, EngineInfo, ChainMessageType,
IoClient,
ChainMessageType, ChainNotify, ChainRoute, PruningInfo, ProvingBlockChainClient,
EngineInfo, IoClient,
};
use encoded;
use engines::{EthEngine, EpochTransition};
@@ -125,7 +125,7 @@ impl<'a> ::std::ops::Sub<&'a ClientReport> for ClientReport {
self.blocks_imported -= other.blocks_imported;
self.transactions_applied -= other.transactions_applied;
self.gas_processed = self.gas_processed - other.gas_processed;
self.state_db_mem = higher_mem - lower_mem;
self.state_db_mem = higher_mem - lower_mem;
self
}
@@ -257,32 +257,6 @@ impl Importer {
})
}
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
fn map_to_vec(map: Vec<(H256, bool)>) -> Vec<H256> {
map.into_iter().map(|(k, _v)| k).collect()
}
// In ImportRoute we get all the blocks that have been enacted and retracted by single insert.
// Because we are doing multiple inserts some of the blocks that were enacted in import `k`
// could be retracted in import `k+1`. This is why to understand if after all inserts
// the block is enacted or retracted we iterate over all routes and at the end final state
// will be in the hashmap
let map = import_results.iter().fold(HashMap::new(), |mut map, route| {
for hash in &route.enacted {
map.insert(hash.clone(), true);
}
for hash in &route.retracted {
map.insert(hash.clone(), false);
}
map
});
// Split to enacted retracted (using hashmap value)
let (enacted, retracted) = map.into_iter().partition(|&(_k, v)| v);
// And convert tuples to keys
(map_to_vec(enacted), map_to_vec(retracted))
}
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self, client: &Client) -> usize {
@@ -343,27 +317,22 @@ impl Importer {
self.block_queue.mark_as_bad(&invalid_blocks);
}
let is_empty = self.block_queue.mark_as_good(&imported_blocks);
let duration_ns = {
let elapsed = start.elapsed();
elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64
};
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, duration_ns, is_empty)
(imported_blocks, import_results, invalid_blocks, imported, proposed_blocks, start.elapsed(), is_empty)
};
{
if !imported_blocks.is_empty() && is_empty {
let (enacted, retracted) = self.calculate_enacted_retracted(&import_results);
let route = ChainRoute::from(import_results.as_ref());
if is_empty {
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, &enacted, &retracted, false);
self.miner.chain_new_blocks(client, &imported_blocks, &invalid_blocks, route.enacted(), route.retracted(), false);
}
client.notify(|notify| {
notify.new_blocks(
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
route.clone(),
Vec::new(),
proposed_blocks.clone(),
duration,
@@ -1030,7 +999,8 @@ impl Client {
/// Otherwise, this can fail (but may not) if the DB prunes state.
pub fn state_at_beginning(&self, id: BlockId) -> Option<State<StateDB>> {
match self.block_number(id) {
None | Some(0) => None,
None => None,
Some(0) => self.state_at(id),
Some(n) => self.state_at(BlockId::Number(n - 1)),
}
}
@@ -1446,7 +1416,7 @@ impl Call for Client {
}
fn estimate_gas(&self, t: &SignedTransaction, state: &Self::State, header: &Header) -> Result<U256, CallError> {
let (mut upper, max_upper, env_info) = {
let (mut upper, max_upper, env_info) = {
let init = *header.gas_limit();
let max = init * U256::from(10);
@@ -2041,15 +2011,16 @@ impl IoClient for Client {
let first = queued.write().1.pop_front();
if let Some((header, block_bytes, receipts_bytes)) = first {
let hash = header.hash();
client.importer.import_old_block(
let result = client.importer.import_old_block(
&header,
&block_bytes,
&receipts_bytes,
&**client.db.read(),
&*client.chain.read()
).ok().map_or((), |e| {
&*client.chain.read(),
);
if let Err(e) = result {
error!(target: "client", "Error importing ancient block: {}", e);
});
}
// remove from pending
queued.write().0.remove(&hash);
} else {
@@ -2171,20 +2142,16 @@ impl ImportSealedBlock for Client {
self.state_db.write().sync_cache(&route.enacted, &route.retracted, false);
route
};
let (enacted, retracted) = self.importer.calculate_enacted_retracted(&[route]);
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted, self.engine.seals_internally().is_some());
let route = ChainRoute::from([route].as_ref());
self.importer.miner.chain_new_blocks(self, &[h.clone()], &[], route.enacted(), route.retracted(), true);
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted.clone(),
retracted.clone(),
route.clone(),
vec![h.clone()],
vec![],
{
let elapsed = start.elapsed();
elapsed.as_secs() * 1_000_000_000 + elapsed.subsec_nanos() as u64
},
start.elapsed(),
);
});
self.db.read().flush().expect("DB flush failed.");
@@ -2194,15 +2161,15 @@ impl ImportSealedBlock for Client {
impl BroadcastProposalBlock for Client {
fn broadcast_proposal_block(&self, block: SealedBlock) {
const DURATION_ZERO: Duration = Duration::from_millis(0);
self.notify(|notify| {
notify.new_blocks(
vec![],
vec![],
vec![],
vec![],
ChainRoute::default(),
vec![],
vec![block.rlp_bytes()],
0,
DURATION_ZERO,
);
});
}

View File

@@ -31,7 +31,7 @@ pub use self::error::Error;
pub use self::evm_test_client::{EvmTestClient, EvmTestError, TransactResult};
pub use self::io_message::ClientIoMessage;
pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use self::chain_notify::{ChainNotify, ChainMessageType};
pub use self::chain_notify::{ChainNotify, ChainRoute, ChainRouteType, ChainMessageType};
pub use self::traits::{
Nonce, Balance, ChainInfo, BlockInfo, ReopenBlock, PrepareOpenBlock, CallContract, TransactionInfo, RegistryInfo, ScheduleInfo, ImportSealedBlock, BroadcastProposalBlock, ImportBlock,
StateOrBlock, StateClient, Call, EngineInfo, AccountData, BlockChain, BlockProducer, SealedBlockImporter

View File

@@ -382,12 +382,16 @@ impl Decodable for SealedEmptyStep {
}
}
struct PermissionedStep {
inner: Step,
can_propose: AtomicBool,
}
/// Engine using `AuthorityRound` proof-of-authority BFT consensus.
pub struct AuthorityRound {
transition_service: IoService<()>,
step: Arc<Step>,
can_propose: AtomicBool,
client: RwLock<Option<Weak<EngineClient>>>,
step: Arc<PermissionedStep>,
client: Arc<RwLock<Option<Weak<EngineClient>>>>,
signer: RwLock<EngineSigner>,
validators: Box<ValidatorSet>,
validate_score_transition: u64,
@@ -407,7 +411,7 @@ pub struct AuthorityRound {
// header-chain validator.
struct EpochVerifier {
step: Arc<Step>,
step: Arc<PermissionedStep>,
subchain_validators: SimpleList,
empty_steps_transition: u64,
}
@@ -415,7 +419,7 @@ struct EpochVerifier {
impl super::EpochVerifier<EthereumMachine> for EpochVerifier {
fn verify_light(&self, header: &Header) -> Result<(), Error> {
// Validate the timestamp
verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?)?;
verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?)?;
// always check the seal since it's fast.
// nothing heavier to do.
verify_external(header, &self.subchain_validators, self.empty_steps_transition)
@@ -615,13 +619,15 @@ impl AuthorityRound {
let engine = Arc::new(
AuthorityRound {
transition_service: IoService::<()>::start()?,
step: Arc::new(Step {
inner: AtomicUsize::new(initial_step),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
step: Arc::new(PermissionedStep {
inner: Step {
inner: AtomicUsize::new(initial_step),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
},
can_propose: AtomicBool::new(true),
}),
can_propose: AtomicBool::new(true),
client: RwLock::new(None),
client: Arc::new(RwLock::new(None)),
signer: Default::default(),
validators: our_params.validators,
validate_score_transition: our_params.validate_score_transition,
@@ -641,7 +647,10 @@ impl AuthorityRound {
// Do not initialize timeouts for tests.
if should_timeout {
let handler = TransitionHandler { engine: Arc::downgrade(&engine) };
let handler = TransitionHandler {
step: engine.step.clone(),
client: engine.client.clone(),
};
engine.transition_service.register_handler(Arc::new(handler))?;
}
Ok(engine)
@@ -667,7 +676,7 @@ impl AuthorityRound {
}
fn generate_empty_step(&self, parent_hash: &H256) {
let step = self.step.load();
let step = self.step.inner.load();
let empty_step_rlp = empty_step_rlp(step, parent_hash);
if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) {
@@ -699,34 +708,37 @@ fn unix_now() -> Duration {
}
struct TransitionHandler {
engine: Weak<AuthorityRound>,
step: Arc<PermissionedStep>,
client: Arc<RwLock<Option<Weak<EngineClient>>>>,
}
const ENGINE_TIMEOUT_TOKEN: TimerToken = 23;
impl IoHandler<()> for TransitionHandler {
fn initialize(&self, io: &IoContext<()>) {
if let Some(engine) = self.engine.upgrade() {
let remaining = engine.step.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
let remaining = self.step.inner.duration_remaining().as_millis();
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e))
}
fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
if timer == ENGINE_TIMEOUT_TOKEN {
if let Some(engine) = self.engine.upgrade() {
// NOTE we might be lagging by couple of steps in case the timeout
// has not been called fast enough.
// Make sure to advance up to the actual step.
while engine.step.duration_remaining().as_millis() == 0 {
engine.step();
// NOTE we might be lagging by couple of steps in case the timeout
// has not been called fast enough.
// Make sure to advance up to the actual step.
while self.step.inner.duration_remaining().as_millis() == 0 {
self.step.inner.increment();
self.step.can_propose.store(true, AtomicOrdering::SeqCst);
if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() {
c.update_sealing();
}
}
let next_run_at = engine.step.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
let next_run_at = self.step.inner.duration_remaining().as_millis() >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
}
@@ -743,8 +755,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
}
fn step(&self) {
self.step.increment();
self.can_propose.store(true, AtomicOrdering::SeqCst);
self.step.inner.increment();
self.step.can_propose.store(true, AtomicOrdering::SeqCst);
if let Some(ref weak) = *self.client.read() {
if let Some(c) = weak.upgrade() {
c.update_sealing();
@@ -791,7 +803,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn populate_from_parent(&self, header: &mut Header, parent: &Header) {
let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed");
let current_step = self.step.load();
let current_step = self.step.inner.load();
let current_empty_steps_len = if header.number() >= self.empty_steps_transition {
self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len()
@@ -817,7 +829,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;;
if empty_step.verify(&*self.validators).unwrap_or(false) {
if self.step.check_future(empty_step.step).is_ok() {
if self.step.inner.check_future(empty_step.step).is_ok() {
trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step);
self.handle_empty_step_message(empty_step);
} else {
@@ -837,7 +849,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal {
// first check to avoid generating signature most of the time
// (but there's still a race to the `compare_and_swap`)
if !self.can_propose.load(AtomicOrdering::SeqCst) {
if !self.step.can_propose.load(AtomicOrdering::SeqCst) {
trace!(target: "engine", "Aborting seal generation. Can't propose.");
return Seal::None;
}
@@ -846,7 +858,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
let parent_step: U256 = header_step(parent, self.empty_steps_transition)
.expect("Header has been verified; qed").into();
let step = self.step.load();
let step = self.step.inner.load();
// filter messages from old and future steps and different parents
let empty_steps = if header.number() >= self.empty_steps_transition {
@@ -923,7 +935,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step);
// only issue the seal if we were the first to reach the compare_and_swap.
if self.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
self.clear_empty_steps(parent_step);
@@ -999,7 +1011,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
.decode()?;
let parent_step = header_step(&parent, self.empty_steps_transition)?;
let current_step = self.step.load();
let current_step = self.step.inner.load();
self.empty_steps(parent_step.into(), current_step.into(), parent.hash())
} else {
// we're verifying a block, extract empty steps from the seal
@@ -1049,7 +1061,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// If yes then probably benign reporting needs to be moved further in the verification.
let set_number = header.number();
match verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?) {
match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) {
Err(BlockError::InvalidSeal) => {
self.validators.report_benign(header.author(), set_number, header.number());
Err(BlockError::InvalidSeal.into())
@@ -1291,7 +1303,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
// This way, upon encountering an epoch change, the proposer from the
// new set will be forced to wait until the next step to avoid sealing a
// block that breaks the invariant that the parent's step < the block's step.
self.can_propose.store(false, AtomicOrdering::SeqCst);
self.step.can_propose.store(false, AtomicOrdering::SeqCst);
return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof));
}
}

View File

@@ -126,6 +126,8 @@ pub struct MinerOptions {
pub tx_queue_strategy: PrioritizationStrategy,
/// Simple senders penalization.
pub tx_queue_penalization: Penalization,
/// Do we want to mark transactions recieved locally (e.g. RPC) as local if we don't have the sending account?
pub tx_queue_no_unfamiliar_locals: bool,
/// Do we refuse to accept service transactions even if sender is certified.
pub refuse_service_transactions: bool,
/// Transaction pool limits.
@@ -149,6 +151,7 @@ impl Default for MinerOptions {
infinite_pending_block: false,
tx_queue_strategy: PrioritizationStrategy::GasPriceOnly,
tx_queue_penalization: Penalization::Disabled,
tx_queue_no_unfamiliar_locals: false,
refuse_service_transactions: false,
pool_limits: pool::Options {
max_count: 8_192,
@@ -688,6 +691,20 @@ impl Miner {
// Return if we restarted
prepare_new
}
/// Prepare pending block, check whether sealing is needed, and then update sealing.
fn prepare_and_update_sealing<C: miner::BlockChainClient>(&self, chain: &C) {
use miner::MinerService;
// Make sure to do it after transaction is imported and lock is dropped.
// We need to create pending block and enable sealing.
if self.engine.seals_internally().unwrap_or(false) || !self.prepare_pending_block(chain) {
// If new block has not been prepared (means we already had one)
// or Engine might be able to seal internally,
// we need to update sealing.
self.update_sealing(chain);
}
}
}
const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
@@ -754,12 +771,12 @@ impl miner::MinerService for Miner {
transactions.into_iter().map(pool::verifier::Transaction::Unverified).collect(),
);
// --------------------------------------------------------------------------
// | NOTE Code below requires sealing locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
if !results.is_empty() && self.options.reseal_on_external_tx && self.sealing.lock().reseal_allowed() {
// --------------------------------------------------------------------------
// | NOTE Code below requires sealing locks. |
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
self.update_sealing(chain);
self.prepare_and_update_sealing(chain);
}
results
@@ -768,8 +785,9 @@ impl miner::MinerService for Miner {
fn import_own_transaction<C: miner::BlockChainClient>(
&self,
chain: &C,
pending: PendingTransaction,
pending: PendingTransaction
) -> Result<(), transaction::Error> {
// note: you may want to use `import_claimed_local_transaction` instead of this one.
trace!(target: "own_tx", "Importing transaction: {:?}", pending);
@@ -784,19 +802,34 @@ impl miner::MinerService for Miner {
// | Make sure to release the locks before calling that method. |
// --------------------------------------------------------------------------
if imported.is_ok() && self.options.reseal_on_own_tx && self.sealing.lock().reseal_allowed() {
// Make sure to do it after transaction is imported and lock is droped.
// We need to create pending block and enable sealing.
if self.engine.seals_internally().unwrap_or(false) || !self.prepare_pending_block(chain) {
// If new block has not been prepared (means we already had one)
// or Engine might be able to seal internally,
// we need to update sealing.
self.update_sealing(chain);
}
self.prepare_and_update_sealing(chain);
}
imported
}
fn import_claimed_local_transaction<C: miner::BlockChainClient>(
&self,
chain: &C,
pending: PendingTransaction,
trusted: bool
) -> Result<(), transaction::Error> {
// treat the tx as local if the option is enabled, or if we have the account
let sender = pending.sender();
let treat_as_local = trusted
|| !self.options.tx_queue_no_unfamiliar_locals
|| self.accounts.as_ref().map(|accts| accts.has_account(sender)).unwrap_or(false);
if treat_as_local {
self.import_own_transaction(chain, pending)
} else {
// We want to replicate behaviour for external transactions if we're not going to treat
// this as local. This is important with regards to sealing blocks
self.import_external_transactions(chain, vec![pending.transaction.into()])
.pop().expect("one result per tx, as in `import_own_transaction`")
}
}
fn local_transactions(&self) -> BTreeMap<H256, pool::local_transactions::Status> {
self.transaction_queue.local_transactions()
}
@@ -1133,6 +1166,7 @@ mod tests {
infinite_pending_block: false,
tx_queue_penalization: Penalization::Disabled,
tx_queue_strategy: PrioritizationStrategy::GasPriceOnly,
tx_queue_no_unfamiliar_locals: false,
refuse_service_transactions: false,
pool_limits: Default::default(),
pool_verification_options: pool::verifier::Options {
@@ -1147,8 +1181,10 @@ mod tests {
)
}
const TEST_CHAIN_ID: u64 = 2;
fn transaction() -> SignedTransaction {
transaction_with_chain_id(2)
transaction_with_chain_id(TEST_CHAIN_ID)
}
fn transaction_with_chain_id(chain_id: u64) -> SignedTransaction {
@@ -1222,6 +1258,53 @@ mod tests {
assert_eq!(miner.ready_transactions(&client).len(), 1);
}
#[test]
fn should_treat_unfamiliar_locals_selectively() {
// given
let keypair = Random.generate().unwrap();
let client = TestBlockChainClient::default();
let account_provider = AccountProvider::transient_provider();
account_provider.insert_account(keypair.secret().clone(), "").expect("can add accounts to the provider we just created");
let miner = Miner::new(
MinerOptions {
tx_queue_no_unfamiliar_locals: true,
..miner().options
},
GasPricer::new_fixed(0u64.into()),
&Spec::new_test(),
Some(Arc::new(account_provider)),
);
let transaction = transaction();
let best_block = 0;
// when
// This transaction should not be marked as local because our account_provider doesn't have the sender
let res = miner.import_claimed_local_transaction(&client, PendingTransaction::new(transaction.clone(), None), false);
// then
// Check the same conditions as `should_import_external_transaction` first. Behaviour should be identical.
// That is: it's treated as though we added it through `import_external_transactions`
assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None);
assert_eq!(miner.ready_transactions(&client).len(), 0);
assert!(miner.prepare_pending_block(&client));
assert_eq!(miner.ready_transactions(&client).len(), 1);
// when - 2nd part: create a local transaction from account_provider.
// Borrow the transaction used before & sign with our generated keypair.
let local_transaction = transaction.deconstruct().0.as_unsigned().clone().sign(keypair.secret(), Some(TEST_CHAIN_ID));
let res2 = miner.import_claimed_local_transaction(&client, PendingTransaction::new(local_transaction, None), false);
// then - 2nd part: we add on the results from the last pending block.
// This is borrowed from `should_make_pending_block_when_importing_own_transaction` and slightly modified.
assert_eq!(res2.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 2);
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 2);
assert_eq!(miner.ready_transactions(&client).len(), 2);
assert!(!miner.prepare_pending_block(&client));
}
#[test]
fn should_not_seal_unless_enabled() {
let miner = miner();

View File

@@ -139,6 +139,12 @@ pub trait MinerService : Send + Sync {
-> Result<(), transaction::Error>
where C: BlockChainClient;
/// Imports transactions from potentially external sources, with behaviour determined
/// by the config flag `tx_queue_allow_unfamiliar_locals`
fn import_claimed_local_transaction<C>(&self, chain: &C, transaction: PendingTransaction, trusted: bool)
-> Result<(), transaction::Error>
where C: BlockChainClient;
/// Removes transaction from the pool.
///
/// Attempts to "cancel" a transaction. If it was not propagated yet (or not accepted by other peers)

View File

@@ -124,7 +124,7 @@ impl<'a, C: 'a> pool::client::Client for PoolClient<'a, C> where
pool::client::AccountDetails {
nonce: self.cached_nonces.account_nonce(address),
balance: self.chain.latest_balance(address),
is_local: self.accounts.map_or(false, |accounts| accounts.has_account(*address).unwrap_or(false)),
is_local: self.accounts.map_or(false, |accounts| accounts.has_account(*address)),
}
}

View File

@@ -215,7 +215,7 @@ fn fixed_to_contract_only() {
secret!("dog42"),
]);
assert!(provider.has_account(*RICH_ADDR).unwrap());
assert!(provider.has_account(*RICH_ADDR));
let client = make_chain(provider, 3, vec![
Transition::Manual(3, vec![addrs[2], addrs[3], addrs[5], addrs[7]]),
@@ -248,7 +248,7 @@ fn fixed_to_contract_to_contract() {
secret!("dog42"),
]);
assert!(provider.has_account(*RICH_ADDR).unwrap());
assert!(provider.has_account(*RICH_ADDR));
let client = make_chain(provider, 3, vec![
Transition::Manual(3, vec![addrs[2], addrs[3], addrs[5], addrs[7]]),

View File

@@ -17,14 +17,14 @@
//! Watcher for snapshot-related chain events.
use parking_lot::Mutex;
use client::{BlockInfo, Client, ChainNotify, ClientIoMessage};
use client::{BlockInfo, Client, ChainNotify, ChainRoute, ClientIoMessage};
use ids::BlockId;
use io::IoChannel;
use ethereum_types::H256;
use bytes::Bytes;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
// helper trait for transforming hashes to numbers and checking if syncing.
trait Oracle: Send + Sync {
@@ -103,11 +103,10 @@ impl ChainNotify for Watcher {
&self,
imported: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: Vec<H256>,
_: ChainRoute,
_: Vec<H256>,
_: Vec<Bytes>,
_duration: u64)
_duration: Duration)
{
if self.oracle.is_major_importing() { return }
@@ -131,11 +130,12 @@ impl ChainNotify for Watcher {
mod tests {
use super::{Broadcast, Oracle, Watcher};
use client::ChainNotify;
use client::{ChainNotify, ChainRoute};
use ethereum_types::{H256, U256};
use std::collections::HashMap;
use std::time::Duration;
struct TestOracle(HashMap<H256, u64>);
@@ -158,6 +158,8 @@ mod tests {
// helper harness for tests which expect a notification.
fn harness(numbers: Vec<u64>, period: u64, history: u64, expected: Option<u64>) {
const DURATION_ZERO: Duration = Duration::from_millis(0);
let hashes: Vec<_> = numbers.clone().into_iter().map(|x| H256::from(U256::from(x))).collect();
let map = hashes.clone().into_iter().zip(numbers).collect();
@@ -171,11 +173,10 @@ mod tests {
watcher.new_blocks(
hashes,
vec![],
ChainRoute::default(),
vec![],
vec![],
vec![],
vec![],
0,
DURATION_ZERO,
);
}

View File

@@ -39,10 +39,10 @@ impl<'a, 'view> ViewRlp<'a> where 'a : 'view {
/// Returns a new instance replacing existing rlp with new rlp, maintaining debug info
fn new_from_rlp(&self, rlp: Rlp<'a>) -> Self {
ViewRlp {
rlp,
ViewRlp {
rlp,
file: self.file,
line: self.line
line: self.line
}
}
@@ -53,7 +53,12 @@ impl<'a, 'view> ViewRlp<'a> where 'a : 'view {
}
fn expect_valid_rlp<T>(&self, r: Result<T, DecoderError>) -> T {
r.expect(&format!("View rlp is trusted and should be valid. Constructed in {} on line {}", self.file, self.line))
r.unwrap_or_else(|e| panic!(
"View rlp is trusted and should be valid. Constructed in {} on line {}: {}",
self.file,
self.line,
e
))
}
/// Returns rlp at the given index, panics if no rlp at that index
@@ -75,7 +80,7 @@ impl<'a, 'view> ViewRlp<'a> where 'a : 'view {
/// Returns decoded value at the given index, panics not present or valid at that index
pub fn val_at<T>(&self, index: usize) -> T where T : Decodable {
self.expect_valid_rlp(self.rlp.val_at(index))
}
}
/// Returns decoded list of values, panics if rlp is invalid
pub fn list_at<T>(&self, index: usize) -> Vec<T> where T: Decodable {