Beta: 1.11.6 backports (#9015)

* parity-version: bump beta to 1.11.6

* scripts: remove md5 checksums (#8884)

* Add support for --chain tobalaba

* Convert indents to tabs :)

* Fixes for misbehavior reporting in AuthorityRound (#8998)

* aura: only report after checking for repeated skipped primaries

* aura: refactor duplicate code for getting epoch validator set

* aura: verify_external: report on validator set contract instance

* aura: use correct validator set epoch number when reporting

* aura: use epoch set when verifying blocks

* aura: report skipped primaries when generating seal

* aura: handle immediate transitions

* aura: don't report skipped steps from genesis to first block

* aura: fix reporting test

* aura: refactor duplicate code to handle immediate_transitions

* aura: let reporting fail on verify_block_basic

* aura: add comment about possible failure of reporting

* Only return error log for rustls (#9025)

* Transaction Pool improvements (#8470)

* Don't use ethereum_types in transaction pool.

* Hide internal insertion_id.

* Fix tests.

* Review grumbles.

* Improve should_replace on NonceAndGasPrice (#8980)

* Additional tests for NonceAndGasPrice::should_replace.

* Fix should_replace in the distinct sender case.

* Use natural priority ordering to simplify should_replace.

* Minimal effective gas price in the queue (#8934)

* Minimal effective gas price.

* Fix naming, add test

* Fix minimal entry score and add test.

* Fix worst_transaction.

* Remove effective gas price threshold.

* Don't leak gas_price decisions out of Scoring.

* Never drop local transactions from different senders. (#9002)

* Recently rejected cache for transaction queue (#9005)

* Store recently rejected transactions.

* Don't cache AlreadyImported rejections.

* Make the size of transaction verification queue dependent on pool size.

* Add a test for recently rejected.

* Fix logging for recently rejected.

* Make rejection cache smaller.

* obsolete test removed

* obsolete test removed

* Construct cache with_capacity.

* Optimize pending transactions filter (#9026)

* rpc: return unordered transactions in pending transactions filter

* ethcore: use LruCache for nonce cache

Only clear the nonce cache when a block is retracted

* Revert "ethcore: use LruCache for nonce cache"

This reverts commit b382c19abdb9985be1724c3b8cde83906da07d68.

* Use only cached nonces when computing pending hashes.

* Give filters their own locks, so that they don't block one another.

* Fix pending transaction count if not sealing.

* Clear cache only when block is enacted.

* Fix RPC tests.

* Address review comments.

* A last bunch of txqueue performance optimizations (#9024)

* Clear cache only when block is enacted.

* Add tracing for cull.

* Cull split.

* Cull after creating pending block.

* Add constant, remove sync::read tracing.

* Reset debug.

* Remove excessive tracing.

* Use struct for NonceCache.

* Fix build

* Remove warnings.

* Fix build again.

* miner: add missing macro use for trace_time

* ci: remove md5 merge leftovers
This commit is contained in:
Afri Schoedon 2018-07-09 08:39:46 +02:00 committed by GitHub
parent 0487c5b7a7
commit 4ba600fcc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1356 additions and 581 deletions

12
Cargo.lock generated
View File

@ -1963,7 +1963,7 @@ dependencies = [
[[package]]
name = "parity"
version = "1.11.5"
version = "1.11.6"
dependencies = [
"ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"atty 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2014,7 +2014,7 @@ dependencies = [
"parity-rpc 1.11.0",
"parity-rpc-client 1.4.0",
"parity-updater 1.11.0",
"parity-version 1.11.5",
"parity-version 1.11.6",
"parity-whisper 0.1.0",
"parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"path 0.1.0",
@ -2062,7 +2062,7 @@ dependencies = [
"parity-reactor 0.1.0",
"parity-ui 1.11.0",
"parity-ui-deprecation 1.10.0",
"parity-version 1.11.5",
"parity-version 1.11.6",
"parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"registrar 0.0.1",
@ -2204,7 +2204,7 @@ dependencies = [
"order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-reactor 0.1.0",
"parity-updater 1.11.0",
"parity-version 1.11.5",
"parity-version 1.11.6",
"parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"patricia-trie 0.1.0",
"pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2325,7 +2325,7 @@ dependencies = [
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-hash-fetch 1.11.0",
"parity-version 1.11.5",
"parity-version 1.11.6",
"parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"path 0.1.0",
"rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2336,7 +2336,7 @@ dependencies = [
[[package]]
name = "parity-version"
version = "1.11.5"
version = "1.11.6"
dependencies = [
"ethcore-bytes 0.1.0",
"rlp 0.2.1",

View File

@ -2,7 +2,7 @@
description = "Parity Ethereum client"
name = "parity"
# NOTE Make sure to update util/version/Cargo.toml as well
version = "1.11.5"
version = "1.11.6"
license = "GPL-3.0"
authors = ["Parity Technologies <admin@parity.io>"]

View File

@ -82,7 +82,7 @@ use ethcore::client::{
Client, ChainNotify, ChainRoute, ChainMessageType, ClientIoMessage, BlockId, CallContract
};
use ethcore::account_provider::AccountProvider;
use ethcore::miner::{self, Miner, MinerService};
use ethcore::miner::{self, Miner, MinerService, pool_client::NonceCache};
use ethcore::trace::{Tracer, VMTracer};
use rustc_hex::FromHex;
@ -94,6 +94,9 @@ use_contract!(private, "PrivateContract", "res/private.json");
/// Initialization vector length.
const INIT_VEC_LEN: usize = 16;
/// Size of nonce cache
const NONCE_CACHE_SIZE: usize = 128;
/// Configurtion for private transaction provider
#[derive(Default, PartialEq, Debug, Clone)]
pub struct ProviderConfig {
@ -243,7 +246,7 @@ impl Provider where {
Ok(original_transaction)
}
fn pool_client<'a>(&'a self, nonce_cache: &'a RwLock<HashMap<Address, U256>>) -> miner::pool_client::PoolClient<'a, Client> {
fn pool_client<'a>(&'a self, nonce_cache: &'a NonceCache) -> miner::pool_client::PoolClient<'a, Client> {
let engine = self.client.engine();
let refuse_service_transactions = true;
miner::pool_client::PoolClient::new(
@ -262,7 +265,7 @@ impl Provider where {
/// can be replaced with a single `drain()` method instead.
/// Thanks to this we also don't really need to lock the entire verification for the time of execution.
fn process_queue(&self) -> Result<(), Error> {
let nonce_cache = Default::default();
let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
let mut verification_queue = self.transactions_for_verification.lock();
let ready_transactions = verification_queue.ready_transactions(self.pool_client(&nonce_cache));
for transaction in ready_transactions {
@ -583,7 +586,7 @@ impl Importer for Arc<Provider> {
trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default();
let nonce_cache = NonceCache::new(NONCE_CACHE_SIZE);
self.transactions_for_verification.lock().add_transaction(
original_tx,
contract,

File diff suppressed because one or more lines are too long

View File

@ -86,7 +86,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;
use_contract!(registry, "Registry", "res/contracts/registrar.json");
const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
@ -710,13 +709,12 @@ impl Client {
tracedb: tracedb,
engine: engine,
pruning: config.pruning.clone(),
config: config,
db: RwLock::new(db),
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
@ -729,6 +727,7 @@ impl Client {
registrar_address,
exit_handler: Mutex::new(None),
importer,
config,
});
// prune old states.

View File

@ -71,12 +71,6 @@ pub enum Mode {
Off,
}
impl Default for Mode {
fn default() -> Self {
Mode::Active
}
}
impl Display for Mode {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match *self {
@ -112,7 +106,7 @@ impl From<IpcMode> for Mode {
/// Client configuration. Includes configs for all sub-systems.
#[derive(Debug, PartialEq, Default)]
#[derive(Debug, PartialEq, Clone)]
pub struct ClientConfig {
/// Block queue configuration.
pub queue: QueueConfig,
@ -150,11 +144,39 @@ pub struct ClientConfig {
pub history_mem: usize,
/// Check seal valididity on block import
pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
}
impl Default for ClientConfig {
fn default() -> Self {
let mb = 1024 * 1024;
ClientConfig {
queue: Default::default(),
blockchain: Default::default(),
tracing: Default::default(),
vm_type: Default::default(),
fat_db: false,
pruning: journaldb::Algorithm::OverlayRecent,
name: "default".into(),
db_cache_size: None,
db_compaction: Default::default(),
db_wal: true,
mode: Mode::Active,
spec_name: "".into(),
verifier_type: VerifierType::Canon,
state_cache_size: 1 * mb,
jump_table_size: 1 * mb,
history: 64,
history_mem: 32 * mb,
check_seal: true,
transaction_verification_queue_size: 8192,
}
}
}
#[cfg(test)]
mod test {
use super::{DatabaseCompactionProfile, Mode};
use super::{DatabaseCompactionProfile};
#[test]
fn test_default_compaction_profile() {
@ -167,9 +189,4 @@ mod test {
assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap());
assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap());
}
#[test]
fn test_mode_default() {
assert_eq!(Mode::default(), Mode::Active);
}
}

View File

@ -16,12 +16,13 @@
//! A blockchain engine that supports a non-instant BFT proof-of-authority.
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::iter::FromIterator;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::collections::{BTreeMap, HashSet};
use std::iter::FromIterator;
use account_provider::AccountProvider;
use block::*;
@ -29,7 +30,7 @@ use client::EngineClient;
use engines::{Engine, Seal, EngineError, ConstructedVerifier};
use engines::block_reward;
use engines::block_reward::{BlockRewardContract, RewardKind};
use error::{Error, BlockError};
use error::{Error, ErrorKind, BlockError};
use ethjson;
use machine::{AuxiliaryData, Call, EthereumMachine};
use hash::keccak;
@ -575,7 +576,6 @@ fn verify_external(header: &Header, validators: &ValidatorSet, empty_steps_trans
if is_invalid_proposer {
trace!(target: "engine", "verify_block_external: bad proposer for step: {}", header_step);
validators.report_benign(header.author(), header.number(), header.number());
Err(EngineError::NotProposer(Mismatch { expected: correct_proposer, found: header.author().clone() }))?
} else {
Ok(())
@ -607,6 +607,23 @@ impl AsMillis for Duration {
}
}
// A type for storing owned or borrowed data that has a common type.
// Useful for returning either a borrow or owned data from a function.
enum CowLike<'a, A: 'a + ?Sized, B> {
Borrowed(&'a A),
Owned(B),
}
impl<'a, A: ?Sized, B> Deref for CowLike<'a, A, B> where B: AsRef<A> {
type Target = A;
fn deref(&self) -> &A {
match self {
CowLike::Borrowed(b) => b,
CowLike::Owned(o) => o.as_ref(),
}
}
}
impl AuthorityRound {
/// Create a new instance of AuthorityRound engine.
pub fn new(our_params: AuthorityRoundParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
@ -656,6 +673,30 @@ impl AuthorityRound {
Ok(engine)
}
// fetch correct validator set for epoch at header, taking into account
// finality of previous transitions.
fn epoch_set<'a>(&'a self, header: &Header) -> Result<(CowLike<ValidatorSet, SimpleList>, BlockNumber), Error> {
Ok(if self.immediate_transitions {
(CowLike::Borrowed(&*self.validators), header.number())
} else {
let mut epoch_manager = self.epoch_manager.lock();
let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
debug!(target: "engine", "Unable to verify sig: missing client ref.");
return Err(EngineError::RequiresClient.into())
}
};
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, header) {
debug!(target: "engine", "Unable to zoom to epoch.");
return Err(EngineError::RequiresClient.into())
}
(CowLike::Owned(epoch_manager.validators().clone()), epoch_manager.epoch_transition_number)
})
}
fn empty_steps(&self, from_step: U256, to_step: U256, parent_hash: H256) -> Vec<EmptyStep> {
self.empty_steps.lock().iter().filter(|e| {
U256::from(e.step) > from_step &&
@ -701,6 +742,28 @@ impl AuthorityRound {
}
}
}
fn report_skipped(&self, header: &Header, current_step: usize, parent_step: usize, validators: &ValidatorSet, set_number: u64) {
// we're building on top of the genesis block so don't report any skipped steps
if header.number() == 1 {
return;
}
if let (true, Some(me)) = (current_step > parent_step + 1, self.signer.read().address()) {
debug!(target: "engine", "Author {} built block with step gap. current step: {}, parent step: {}",
header.author(), current_step, parent_step);
let mut reported = HashSet::new();
for step in parent_step + 1..current_step {
let skipped_primary = step_proposer(validators, header.parent_hash(), step);
// Do not report this signer.
if skipped_primary != me {
// Stop reporting once validators start repeating.
if !reported.insert(skipped_primary) { break; }
self.validators.report_benign(&skipped_primary, set_number, header.number());
}
}
}
}
}
fn unix_now() -> Duration {
@ -880,32 +943,15 @@ impl Engine<EthereumMachine> for AuthorityRound {
return Seal::None;
}
// fetch correct validator set for current epoch, taking into account
// finality of previous transitions.
let active_set;
let validators = if self.immediate_transitions {
&*self.validators
} else {
let mut epoch_manager = self.epoch_manager.lock();
let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
warn!(target: "engine", "Unable to generate seal: missing client ref.");
return Seal::None;
}
};
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, header) {
debug!(target: "engine", "Unable to zoom to epoch.");
let (validators, set_number) = match self.epoch_set(header) {
Err(err) => {
warn!(target: "engine", "Unable to generate seal: {}", err);
return Seal::None;
}
active_set = epoch_manager.validators().clone();
&active_set as &_
},
Ok(ok) => ok,
};
if is_step_proposer(validators, header.parent_hash(), step, header.author()) {
if is_step_proposer(&*validators, header.parent_hash(), step, header.author()) {
// this is guarded against by `can_propose` unless the block was signed
// on the same step (implies same key) and on a different node.
if parent_step == step.into() {
@ -936,9 +982,15 @@ impl Engine<EthereumMachine> for AuthorityRound {
// only issue the seal if we were the first to reach the compare_and_swap.
if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) {
// we can drop all accumulated empty step messages that are
// older than the parent step since we're including them in
// the seal
self.clear_empty_steps(parent_step);
// report any skipped primaries between the parent block and
// the block we're sealing
self.report_skipped(header, step, u64::from(parent_step) as usize, &*validators, set_number);
let mut fields = vec![
encode(&step).into_vec(),
encode(&(&H520::from(signature) as &[u8])).into_vec(),
@ -1057,13 +1109,21 @@ impl Engine<EthereumMachine> for AuthorityRound {
)));
}
// TODO [ToDr] Should this go from epoch manager?
// If yes then probably benign reporting needs to be moved further in the verification.
let set_number = header.number();
match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) {
Err(BlockError::InvalidSeal) => {
self.validators.report_benign(header.author(), set_number, header.number());
// This check runs in Phase 1 where there is no guarantee that the parent block is
// already imported, therefore the call to `epoch_set` may fail. In that case we
// won't report the misbehavior but this is not a concern because:
// - Only authorities can report and it's expected that they'll be up-to-date and
// importing, therefore the parent header will most likely be available
// - Even if you are an authority that is syncing the chain, the contract will most
// likely ignore old reports
// - This specific check is only relevant if you're importing (since it checks
// against wall clock)
if let Ok((_, set_number)) = self.epoch_set(header) {
self.validators.report_benign(header.author(), set_number, header.number());
}
Err(BlockError::InvalidSeal.into())
}
Err(e) => Err(e.into()),
@ -1075,8 +1135,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
fn verify_block_family(&self, header: &Header, parent: &Header) -> Result<(), Error> {
let step = header_step(header, self.empty_steps_transition)?;
let parent_step = header_step(parent, self.empty_steps_transition)?;
// TODO [ToDr] Should this go from epoch manager?
let set_number = header.number();
let (validators, set_number) = self.epoch_set(header)?;
// Ensure header is from the step after parent.
if step == parent_step
@ -1103,7 +1163,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
format!("empty step proof for invalid parent hash: {:?}", empty_step.parent_hash)))?;
}
if !empty_step.verify(&*self.validators).unwrap_or(false) {
if !empty_step.verify(&*validators).unwrap_or(false) {
Err(EngineError::InsufficientProof(
format!("invalid empty step proof: {:?}", empty_step)))?;
}
@ -1117,21 +1177,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
}
} else {
// Report skipped primaries.
if let (true, Some(me)) = (step > parent_step + 1, self.signer.read().address()) {
debug!(target: "engine", "Author {} built block with step gap. current step: {}, parent step: {}",
header.author(), step, parent_step);
let mut reported = HashSet::new();
for s in parent_step + 1..step {
let skipped_primary = step_proposer(&*self.validators, &parent.hash(), s);
// Do not report this signer.
if skipped_primary != me {
self.validators.report_benign(&skipped_primary, set_number, header.number());
// Stop reporting once validators start repeating.
if !reported.insert(skipped_primary) { break; }
}
}
}
self.report_skipped(header, step, parent_step, &*validators, set_number);
}
Ok(())
@ -1139,37 +1185,21 @@ impl Engine<EthereumMachine> for AuthorityRound {
// Check the validators.
fn verify_block_external(&self, header: &Header) -> Result<(), Error> {
// fetch correct validator set for current epoch, taking into account
// finality of previous transitions.
let active_set;
let validators = if self.immediate_transitions {
&*self.validators
} else {
// get correct validator set for epoch.
let client = match self.client.read().as_ref().and_then(|weak| weak.upgrade()) {
Some(client) => client,
None => {
debug!(target: "engine", "Unable to verify sig: missing client ref.");
return Err(EngineError::RequiresClient.into())
}
};
let mut epoch_manager = self.epoch_manager.lock();
if !epoch_manager.zoom_to(&*client, &self.machine, &*self.validators, header) {
debug!(target: "engine", "Unable to zoom to epoch.");
return Err(EngineError::RequiresClient.into())
}
active_set = epoch_manager.validators().clone();
&active_set as &_
};
let (validators, set_number) = self.epoch_set(header)?;
// verify signature against fixed list, but reports should go to the
// contract itself.
let res = verify_external(header, validators, self.empty_steps_transition);
if res.is_ok() {
let header_step = header_step(header, self.empty_steps_transition)?;
self.clear_empty_steps(header_step.into());
let res = verify_external(header, &*validators, self.empty_steps_transition);
match res {
Err(Error(ErrorKind::Engine(EngineError::NotProposer(_)), _)) => {
self.validators.report_benign(header.author(), set_number, header.number());
},
Ok(_) => {
// we can drop all accumulated empty step messages that are older than this header's step
let header_step = header_step(header, self.empty_steps_transition)?;
self.clear_empty_steps(header_step.into());
},
_ => {},
}
res
}
@ -1574,7 +1604,6 @@ mod tests {
parent_header.set_seal(vec![encode(&1usize).into_vec()]);
parent_header.set_gas_limit("222222".parse::<U256>().unwrap());
let mut header: Header = Header::default();
header.set_number(1);
header.set_gas_limit("222222".parse::<U256>().unwrap());
header.set_seal(vec![encode(&3usize).into_vec()]);
@ -1584,8 +1613,15 @@ mod tests {
aura.set_signer(Arc::new(AccountProvider::transient_provider()), Default::default(), Default::default());
// Do not report on steps skipped between genesis and first block.
header.set_number(1);
assert!(aura.verify_block_family(&header, &parent_header).is_ok());
assert_eq!(last_benign.load(AtomicOrdering::SeqCst), 1);
assert_eq!(last_benign.load(AtomicOrdering::SeqCst), 0);
// Report on skipped steps otherwise.
header.set_number(2);
assert!(aura.verify_block_family(&header, &parent_header).is_ok());
assert_eq!(last_benign.load(AtomicOrdering::SeqCst), 2);
}
#[test]

View File

@ -53,7 +53,7 @@ pub fn new_validator_set(spec: ValidatorSpec) -> Box<ValidatorSet> {
}
/// A validator set.
pub trait ValidatorSet: Send + Sync {
pub trait ValidatorSet: Send + Sync + 'static {
/// Get the default "Call" helper, for use in general operation.
// TODO [keorn]: this is a hack intended to migrate off of
// a strict dependency on state always being available.

View File

@ -104,6 +104,12 @@ impl ValidatorSet for SimpleList {
}
}
impl AsRef<ValidatorSet> for SimpleList {
fn as_ref(&self) -> &ValidatorSet {
self
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;

View File

@ -61,6 +61,11 @@ pub fn new_expanse<'a, T: Into<SpecParams<'a>>>(params: T) -> Spec {
load(params.into(), include_bytes!("../../res/ethereum/expanse.json"))
}
/// Create a new Tobalaba chain spec.
pub fn new_tobalaba<'a, T: Into<SpecParams<'a>>>(params: T) -> Spec {
load(params.into(), include_bytes!("../../res/ethereum/tobalaba.json"))
}
/// Create a new Musicoin mainnet chain spec.
pub fn new_musicoin<'a, T: Into<SpecParams<'a>>>(params: T) -> Spec {
load(params.into(), include_bytes!("../../res/ethereum/musicoin.json"))

View File

@ -14,8 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cmp;
use std::time::{Instant, Duration};
use std::collections::{BTreeMap, HashSet, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::sync::Arc;
use ansi_term::Colour;
@ -46,7 +47,7 @@ use client::BlockId;
use executive::contract_address;
use header::{Header, BlockNumber};
use miner;
use miner::pool_client::{PoolClient, CachedNonceClient};
use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
use receipt::{Receipt, RichReceipt};
use spec::Spec;
use state::State;
@ -201,7 +202,7 @@ pub struct Miner {
sealing: Mutex<SealingWork>,
params: RwLock<AuthoringParams>,
listeners: RwLock<Vec<Box<NotifyWork>>>,
nonce_cache: RwLock<HashMap<Address, U256>>,
nonce_cache: NonceCache,
gas_pricer: Mutex<GasPricer>,
options: MinerOptions,
// TODO [ToDr] Arc is only required because of price updater
@ -227,6 +228,7 @@ impl Miner {
let limits = options.pool_limits.clone();
let verifier_options = options.pool_verification_options.clone();
let tx_queue_strategy = options.tx_queue_strategy;
let nonce_cache_size = cmp::max(4096, limits.max_count / 4);
Miner {
sealing: Mutex::new(SealingWork {
@ -240,7 +242,7 @@ impl Miner {
params: RwLock::new(AuthoringParams::default()),
listeners: RwLock::new(vec![]),
gas_pricer: Mutex::new(gas_pricer),
nonce_cache: RwLock::new(HashMap::with_capacity(1024)),
nonce_cache: NonceCache::new(nonce_cache_size),
options,
transaction_queue: Arc::new(TransactionQueue::new(limits, verifier_options, tx_queue_strategy)),
accounts,
@ -838,7 +840,40 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>> where
fn pending_transaction_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
C: ChainInfo + Sync,
{
let chain_info = chain.chain_info();
let from_queue = || self.transaction_queue.pending_hashes(
|sender| self.nonce_cache.get(sender),
);
let from_pending = || {
self.map_existing_pending_block(|sealing| {
sealing.transactions()
.iter()
.map(|signed| signed.hash())
.collect()
}, chain_info.best_block_number)
};
match self.options.pending_set {
PendingSet::AlwaysQueue => {
from_queue()
},
PendingSet::AlwaysSealing => {
from_pending().unwrap_or_default()
},
PendingSet::SealingOrElseQueue => {
from_pending().unwrap_or_else(from_queue)
},
}
}
fn ready_transactions<C>(&self, chain: &C)
-> Vec<Arc<VerifiedTransaction>>
where
C: ChainInfo + Nonce + Sync,
{
let chain_info = chain.chain_info();
@ -1043,14 +1078,19 @@ impl miner::MinerService for Miner {
// 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that
// are in those blocks
// Clear nonce cache
self.nonce_cache.write().clear();
let has_new_best_block = enacted.len() > 0;
if has_new_best_block {
// Clear nonce cache
self.nonce_cache.clear();
}
// First update gas limit in transaction queue and minimal gas price.
let gas_limit = *chain.best_block_header().gas_limit();
self.update_transaction_queue_limits(gas_limit);
// Then import all transactions...
// Then import all transactions from retracted blocks.
let client = self.pool_client(chain);
{
retracted
@ -1069,10 +1109,7 @@ impl miner::MinerService for Miner {
});
}
// ...and at the end remove the old ones
self.transaction_queue.cull(client);
if enacted.len() > 0 || (imported.len() > 0 && self.options.reseal_on_uncle) {
if has_new_best_block || (imported.len() > 0 && self.options.reseal_on_uncle) {
// Reset `next_allowed_reseal` in case a block is imported.
// Even if min_period is high, we will always attempt to create
// new pending block.
@ -1086,6 +1123,15 @@ impl miner::MinerService for Miner {
self.update_sealing(chain);
}
}
if has_new_best_block {
// Make sure to cull transactions after we update sealing.
// Not culling won't lead to old transactions being added to the block
// (thanks to Ready), but culling can take significant amount of time,
// so best to leave it after we create some work for miners to prevent increased
// uncle rate.
self.transaction_queue.cull(client);
}
}
fn pending_state(&self, latest_block_number: BlockNumber) -> Option<Self::State> {

View File

@ -28,7 +28,7 @@ pub mod stratum;
pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
use std::sync::Arc;
use std::collections::BTreeMap;
use std::collections::{BTreeSet, BTreeMap};
use bytes::Bytes;
use ethereum_types::{H256, U256, Address};
@ -164,7 +164,13 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;
/// Get a list of all ready transactions.
/// Get a set of all pending transaction hashes.
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn pending_transaction_hashes<C>(&self, chain: &C) -> BTreeSet<H256> where
C: ChainInfo + Sync;
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>>

View File

@ -36,10 +36,32 @@ use header::Header;
use miner;
use miner::service_transaction_checker::ServiceTransactionChecker;
type NoncesCache = RwLock<HashMap<Address, U256>>;
/// Cache for state nonces.
#[derive(Debug)]
pub struct NonceCache {
nonces: RwLock<HashMap<Address, U256>>,
limit: usize
}
const MAX_NONCE_CACHE_SIZE: usize = 4096;
const EXPECTED_NONCE_CACHE_SIZE: usize = 2048;
impl NonceCache {
/// Create new cache with a limit of `limit` entries.
pub fn new(limit: usize) -> Self {
NonceCache {
nonces: RwLock::new(HashMap::with_capacity(limit / 2)),
limit,
}
}
/// Retrieve a cached nonce for given sender.
pub fn get(&self, sender: &Address) -> Option<U256> {
self.nonces.read().get(sender).cloned()
}
/// Clear all entries from the cache.
pub fn clear(&self) {
self.nonces.write().clear();
}
}
/// Blockchain accesss for transaction pool.
pub struct PoolClient<'a, C: 'a> {
@ -70,7 +92,7 @@ C: BlockInfo + CallContract,
/// Creates new client given chain, nonce cache, accounts and service transaction verifier.
pub fn new(
chain: &'a C,
cache: &'a NoncesCache,
cache: &'a NonceCache,
engine: &'a EthEngine,
accounts: Option<&'a AccountProvider>,
refuse_service_transactions: bool,
@ -161,7 +183,7 @@ impl<'a, C: 'a> NonceClient for PoolClient<'a, C> where
pub(crate) struct CachedNonceClient<'a, C: 'a> {
client: &'a C,
cache: &'a NoncesCache,
cache: &'a NonceCache,
}
impl<'a, C: 'a> Clone for CachedNonceClient<'a, C> {
@ -176,13 +198,14 @@ impl<'a, C: 'a> Clone for CachedNonceClient<'a, C> {
impl<'a, C: 'a> fmt::Debug for CachedNonceClient<'a, C> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("CachedNonceClient")
.field("cache", &self.cache.read().len())
.field("cache", &self.cache.nonces.read().len())
.field("limit", &self.cache.limit)
.finish()
}
}
impl<'a, C: 'a> CachedNonceClient<'a, C> {
pub fn new(client: &'a C, cache: &'a NoncesCache) -> Self {
pub fn new(client: &'a C, cache: &'a NonceCache) -> Self {
CachedNonceClient {
client,
cache,
@ -194,27 +217,29 @@ impl<'a, C: 'a> NonceClient for CachedNonceClient<'a, C> where
C: Nonce + Sync,
{
fn account_nonce(&self, address: &Address) -> U256 {
if let Some(nonce) = self.cache.read().get(address) {
if let Some(nonce) = self.cache.nonces.read().get(address) {
return *nonce;
}
// We don't check again if cache has been populated.
// It's not THAT expensive to fetch the nonce from state.
let mut cache = self.cache.write();
let mut cache = self.cache.nonces.write();
let nonce = self.client.latest_nonce(address);
cache.insert(*address, nonce);
if cache.len() < MAX_NONCE_CACHE_SIZE {
if cache.len() < self.cache.limit {
return nonce
}
debug!(target: "txpool", "NonceCache: reached limit.");
trace_time!("nonce_cache:clear");
// Remove excessive amount of entries from the cache
while cache.len() > EXPECTED_NONCE_CACHE_SIZE {
// Just remove random entry
if let Some(key) = cache.keys().next().cloned() {
cache.remove(&key);
}
let to_remove: Vec<_> = cache.keys().take(self.cache.limit / 2).cloned().collect();
for x in to_remove {
cache.remove(&x);
}
nonce
}
}

View File

@ -42,12 +42,6 @@ pub enum VerifierType {
Noop,
}
impl Default for VerifierType {
fn default() -> Self {
VerifierType::Canon
}
}
/// Create a new verifier based on type.
pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> {
match v {

View File

@ -376,7 +376,6 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
trace_time!("sync::read");
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay), *peer, packet_id, data);
}

View File

@ -658,20 +658,29 @@ impl ChainSync {
None
}
).collect();
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();
random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();
random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
}
}
if
@ -706,14 +715,6 @@ impl ChainSync {
trace!(target: "sync", "Skipping busy peer {}", peer_id);
return;
}
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
return;
}
if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
return;
}
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned())
} else {
return;

View File

@ -71,7 +71,7 @@ pub fn setup_log(config: &Config) -> Result<Arc<RotatingLogger>, String> {
builder.filter(Some("ws"), LogLevelFilter::Warn);
builder.filter(Some("reqwest"), LogLevelFilter::Warn);
builder.filter(Some("hyper"), LogLevelFilter::Warn);
builder.filter(Some("rustls"), LogLevelFilter::Warn);
builder.filter(Some("rustls"), LogLevelFilter::Error);
// Enable info for others.
builder.filter(None, LogLevelFilter::Info);

View File

@ -462,7 +462,7 @@
<key>OVERWRITE_PERMISSIONS</key>
<false/>
<key>VERSION</key>
<string>1.11.5</string>
<string>1.11.6</string>
</dict>
<key>UUID</key>
<string>2DCD5B81-7BAF-4DA1-9251-6274B089FD36</string>

View File

@ -30,12 +30,13 @@ extern crate linked_hash_map;
extern crate parking_lot;
extern crate price_info;
extern crate rlp;
extern crate trace_time;
extern crate transaction_pool as txpool;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate trace_time;
#[macro_use]
extern crate log;
#[cfg(test)]

View File

@ -46,20 +46,20 @@ pub enum PrioritizationStrategy {
}
/// Transaction priority.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Clone, Copy)]
pub(crate) enum Priority {
/// Local transactions (high priority)
///
/// Transactions either from a local account or
/// submitted over local RPC connection via `eth_sendRawTransaction`
Local,
/// Regular transactions received over the network. (no priority boost)
Regular,
/// Transactions from retracted blocks (medium priority)
///
/// When block becomes non-canonical we re-import the transactions it contains
/// to the queue and boost their priority.
Retracted,
/// Regular transactions received over the network. (no priority boost)
Regular,
/// Local transactions (high priority)
///
/// Transactions either from a local account or
/// submitted over local RPC connection via `eth_sendRawTransaction`
Local,
}
impl Priority {
@ -105,6 +105,11 @@ impl VerifiedTransaction {
self.priority
}
/// Gets transaction insertion id.
pub(crate) fn insertion_id(&self) -> usize {
self.insertion_id
}
/// Gets wrapped `SignedTransaction`
pub fn signed(&self) -> &transaction::SignedTransaction {
&self.transaction
@ -114,9 +119,13 @@ impl VerifiedTransaction {
pub fn pending(&self) -> &transaction::PendingTransaction {
&self.transaction
}
}
impl txpool::VerifiedTransaction for VerifiedTransaction {
type Hash = H256;
type Sender = Address;
fn hash(&self) -> &H256 {
&self.hash
}
@ -128,8 +137,4 @@ impl txpool::VerifiedTransaction for VerifiedTransaction {
fn sender(&self) -> &Address {
&self.sender
}
fn insertion_id(&self) -> u64 {
self.insertion_id as u64
}
}

View File

@ -19,7 +19,7 @@
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
@ -40,6 +40,14 @@ type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, L
/// since it only affects transaction Condition.
const TIMESTAMP_CACHE: u64 = 1000;
/// How many senders at once do we attempt to process while culling.
///
/// When running with huge transaction pools, culling can take significant amount of time.
/// To prevent holding `write()` lock on the pool for this long period, we split the work into
/// chunks and allow other threads to utilize the pool in the meantime.
/// This parameter controls how many (best) senders at once will be processed.
const CULL_SENDERS_CHUNK: usize = 1024;
/// Transaction queue status.
#[derive(Debug, Clone, PartialEq)]
pub struct Status {
@ -127,6 +135,50 @@ impl CachedPending {
}
}
#[derive(Debug)]
struct RecentlyRejected {
inner: RwLock<HashMap<H256, transaction::Error>>,
limit: usize,
}
impl RecentlyRejected {
fn new(limit: usize) -> Self {
RecentlyRejected {
limit,
inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)),
}
}
fn clear(&self) {
self.inner.write().clear();
}
fn get(&self, hash: &H256) -> Option<transaction::Error> {
self.inner.read().get(hash).cloned()
}
fn insert(&self, hash: H256, err: &transaction::Error) {
if self.inner.read().contains_key(&hash) {
return;
}
let mut inner = self.inner.write();
inner.insert(hash, err.clone());
// clean up
if inner.len() > self.limit {
// randomly remove half of the entries
let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect();
for key in to_remove {
inner.remove(&key);
}
}
}
}
/// Minimal size of rejection cache, by default it's equal to queue size.
const MIN_REJECTED_CACHE_SIZE: usize = 2048;
/// Ethereum Transaction Queue
///
/// Responsible for:
@ -139,6 +191,7 @@ pub struct TransactionQueue {
pool: RwLock<Pool>,
options: RwLock<verifier::Options>,
cached_pending: RwLock<CachedPending>,
recently_rejected: RecentlyRejected,
}
impl TransactionQueue {
@ -148,11 +201,13 @@ impl TransactionQueue {
verification_options: verifier::Options,
strategy: PrioritizationStrategy,
) -> Self {
let max_count = limits.max_count;
TransactionQueue {
insertion_id: Default::default(),
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
options: RwLock::new(verification_options),
cached_pending: RwLock::new(CachedPending::none()),
recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)),
}
}
@ -176,21 +231,50 @@ impl TransactionQueue {
let _timer = ::trace_time::PerfTimer::new("pool::verify_and_import");
let options = self.options.read().clone();
let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone());
let transaction_to_replace = {
let pool = self.pool.read();
if pool.is_full() {
pool.worst_transaction().map(|worst| (pool.scoring().clone(), worst))
} else {
None
}
};
let verifier = verifier::Verifier::new(
client,
options,
self.insertion_id.clone(),
transaction_to_replace,
);
let results = transactions
.into_iter()
.map(|transaction| {
if self.pool.read().find(&transaction.hash()).is_some() {
bail!(transaction::Error::AlreadyImported)
let hash = transaction.hash();
if self.pool.read().find(&hash).is_some() {
return Err(transaction::Error::AlreadyImported);
}
verifier.verify_transaction(transaction)
if let Some(err) = self.recently_rejected.get(&hash) {
trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err);
return Err(err);
}
let imported = verifier
.verify_transaction(transaction)
.and_then(|verified| {
self.pool.write().import(verified).map_err(convert_error)
});
match imported {
Ok(_) => Ok(()),
Err(err) => {
self.recently_rejected.insert(hash, &err);
Err(err)
},
}
})
.map(|result| result.and_then(|verified| {
self.pool.write().import(verified)
.map(|_imported| ())
.map_err(convert_error)
}))
.collect::<Vec<_>>();
// Notify about imported transactions.
@ -209,7 +293,20 @@ impl TransactionQueue {
self.pool.read().pending(ready).collect()
}
/// Returns current pneding transactions.
/// Computes unordered set of pending hashes.
///
/// Since strict nonce-checking is not required, you may get some false positive future transactions as well.
pub fn pending_hashes<N>(
&self,
nonce: N,
) -> BTreeSet<H256> where
N: Fn(&Address) -> Option<U256>,
{
let ready = ready::OptionalState::new(nonce);
self.pool.read().pending(ready).map(|tx| tx.hash).collect()
}
/// Returns current pending transactions ordered by priority.
///
/// NOTE: This may return a cached version of pending transaction set.
/// Re-computing the pending set is possible with `#collect_pending` method,
@ -278,27 +375,38 @@ impl TransactionQueue {
}
/// Culls all stalled transactions from the pool.
pub fn cull<C: client::NonceClient>(
pub fn cull<C: client::NonceClient + Clone>(
&self,
client: C,
) {
trace_time!("pool::cull");
// We don't care about future transactions, so nonce_cap is not important.
let nonce_cap = None;
// We want to clear stale transactions from the queue as well.
// (Transactions that are occuping the queue for a long time without being included)
let stale_id = {
let current_id = self.insertion_id.load(atomic::Ordering::Relaxed) as u64;
let current_id = self.insertion_id.load(atomic::Ordering::Relaxed);
// wait at least for half of the queue to be replaced
let gap = self.pool.read().options().max_count / 2;
// but never less than 100 transactions
let gap = cmp::max(100, gap) as u64;
let gap = cmp::max(100, gap);
current_id.checked_sub(gap)
};
let state_readiness = ready::State::new(client, stale_id, nonce_cap);
self.recently_rejected.clear();
let removed = self.pool.write().cull(None, state_readiness);
let mut removed = 0;
let senders: Vec<_> = {
let pool = self.pool.read();
let senders = pool.senders().cloned().collect();
senders
};
for chunk in senders.chunks(CULL_SENDERS_CHUNK) {
trace_time!("pool::cull::chunk");
let state_readiness = ready::State::new(client.clone(), stale_id, nonce_cap);
removed += self.pool.write().cull(Some(chunk), state_readiness);
}
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
}

View File

@ -54,14 +54,14 @@ pub struct State<C> {
nonces: HashMap<Address, U256>,
state: C,
max_nonce: Option<U256>,
stale_id: Option<u64>,
stale_id: Option<usize>,
}
impl<C> State<C> {
/// Create new State checker, given client interface.
pub fn new(
state: C,
stale_id: Option<u64>,
stale_id: Option<usize>,
max_nonce: Option<U256>,
) -> Self {
State {
@ -91,10 +91,10 @@ impl<C: NonceClient> txpool::Ready<VerifiedTransaction> for State<C> {
match tx.transaction.nonce.cmp(nonce) {
// Before marking as future check for stale ids
cmp::Ordering::Greater => match self.stale_id {
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stalled,
Some(id) if tx.insertion_id() < id => txpool::Readiness::Stale,
_ => txpool::Readiness::Future,
},
cmp::Ordering::Less => txpool::Readiness::Stalled,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
@ -130,6 +130,43 @@ impl txpool::Ready<VerifiedTransaction> for Condition {
}
}
/// Readiness checker that only relies on nonce cache (does actually go to state).
///
/// Checks readiness of transactions by comparing the nonce to state nonce. If nonce
/// isn't found in provided state nonce store, defaults to the tx nonce and updates
/// the nonce store. Useful for using with a state nonce cache when false positives are allowed.
pub struct OptionalState<C> {
nonces: HashMap<Address, U256>,
state: C,
}
impl<C> OptionalState<C> {
pub fn new(state: C) -> Self {
OptionalState {
nonces: Default::default(),
state,
}
}
}
impl<C: Fn(&Address) -> Option<U256>> txpool::Ready<VerifiedTransaction> for OptionalState<C> {
fn is_ready(&mut self, tx: &VerifiedTransaction) -> txpool::Readiness {
let sender = tx.sender();
let state = &self.state;
let nonce = self.nonces.entry(*sender).or_insert_with(|| {
state(sender).unwrap_or_else(|| tx.transaction.nonce)
});
match tx.transaction.nonce.cmp(nonce) {
cmp::Ordering::Greater => txpool::Readiness::Future,
cmp::Ordering::Less => txpool::Readiness::Stale,
cmp::Ordering::Equal => {
*nonce = *nonce + 1.into();
txpool::Readiness::Ready
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -178,7 +215,7 @@ mod tests {
let res = State::new(TestClient::new().with_nonce(125), None, None).is_ready(&tx);
// then
assert_eq!(res, txpool::Readiness::Stalled);
assert_eq!(res, txpool::Readiness::Stale);
}
#[test]
@ -190,7 +227,7 @@ mod tests {
let res = State::new(TestClient::new(), Some(1), None).is_ready(&tx);
// then
assert_eq!(res, txpool::Readiness::Stalled);
assert_eq!(res, txpool::Readiness::Stale);
}
#[test]

View File

@ -28,23 +28,45 @@
//! from our local node (own transactions).
use std::cmp;
use std::sync::Arc;
use ethereum_types::U256;
use txpool;
use super::{PrioritizationStrategy, VerifiedTransaction};
use txpool::{self, scoring};
use super::{verifier, PrioritizationStrategy, VerifiedTransaction};
/// Transaction with the same (sender, nonce) can be replaced only if
/// `new_gas_price > old_gas_price + old_gas_price >> SHIFT`
const GAS_PRICE_BUMP_SHIFT: usize = 3; // 2 = 25%, 3 = 12.5%, 4 = 6.25%
/// Calculate minimal gas price requirement.
#[inline]
fn bump_gas_price(old_gp: U256) -> U256 {
old_gp.saturating_add(old_gp >> GAS_PRICE_BUMP_SHIFT)
}
/// Simple, gas-price based scoring for transactions.
///
/// NOTE: Currently penalization does not apply to new transactions that enter the pool.
/// We might want to store penalization status in some persistent state.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NonceAndGasPrice(pub PrioritizationStrategy);
impl NonceAndGasPrice {
/// Decide if the transaction should even be considered into the pool (if the pool is full).
///
/// Used by Verifier to quickly reject transactions that don't have any chance to get into the pool later on,
/// and save time on more expensive checks like sender recovery, etc.
///
/// NOTE The method is never called for zero-gas-price transactions or local transactions
/// (such transactions are always considered to the pool and potentially rejected later on)
pub fn should_reject_early(&self, old: &VerifiedTransaction, new: &verifier::Transaction) -> bool {
if old.priority().is_local() {
return true
}
&old.transaction.gas_price > new.gas_price()
}
}
impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
type Score = U256;
type Event = ();
@ -53,24 +75,24 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
old.transaction.nonce.cmp(&other.transaction.nonce)
}
fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> txpool::scoring::Choice {
fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
if old.transaction.nonce != new.transaction.nonce {
return txpool::scoring::Choice::InsertNew
return scoring::Choice::InsertNew
}
let old_gp = old.transaction.gas_price;
let new_gp = new.transaction.gas_price;
let min_required_gp = old_gp + (old_gp >> GAS_PRICE_BUMP_SHIFT);
let min_required_gp = bump_gas_price(old_gp);
match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew,
_ => txpool::scoring::Choice::ReplaceOld,
cmp::Ordering::Greater => scoring::Choice::RejectNew,
_ => scoring::Choice::ReplaceOld,
}
}
fn update_scores(&self, txs: &[Arc<VerifiedTransaction>], scores: &mut [U256], change: txpool::scoring::Change) {
use self::txpool::scoring::Change;
fn update_scores(&self, txs: &[txpool::Transaction<VerifiedTransaction>], scores: &mut [U256], change: scoring::Change) {
use self::scoring::Change;
match change {
Change::Culled(_) => {},
@ -79,7 +101,7 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
assert!(i < txs.len());
assert!(i < scores.len());
scores[i] = txs[i].transaction.gas_price;
scores[i] = txs[i].transaction.transaction.gas_price;
let boost = match txs[i].priority() {
super::Priority::Local => 15,
super::Priority::Retracted => 10,
@ -100,24 +122,26 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
}
}
fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> bool {
fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
if old.sender == new.sender {
// prefer earliest transaction
if new.transaction.nonce < old.transaction.nonce {
return true
match new.transaction.nonce.cmp(&old.transaction.nonce) {
cmp::Ordering::Less => scoring::Choice::ReplaceOld,
cmp::Ordering::Greater => scoring::Choice::RejectNew,
cmp::Ordering::Equal => self.choose(old, new),
}
}
// Always kick out non-local transactions in favour of local ones.
if new.priority().is_local() && !old.priority().is_local() {
return true;
}
// And never kick out local transactions in favour of external ones.
if !new.priority().is_local() && old.priority.is_local() {
return false;
}
self.choose(old, new) == txpool::scoring::Choice::ReplaceOld
} else if old.priority().is_local() && new.priority().is_local() {
// accept local transactions over the limit
scoring::Choice::InsertNew
} else {
let old_score = (old.priority(), old.transaction.gas_price);
let new_score = (new.priority(), new.transaction.gas_price);
if new_score > old_score {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
}
}
}
@ -125,22 +149,119 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
mod tests {
use super::*;
use std::sync::Arc;
use ethkey::{Random, Generator};
use pool::tests::tx::{Tx, TxExt};
use txpool::Scoring;
use txpool::scoring::Choice::*;
#[test]
fn should_replace_non_local_transaction_with_local_one() {
// given
fn should_replace_same_sender_by_nonce() {
let scoring = NonceAndGasPrice(PrioritizationStrategy::GasPriceOnly);
let tx1 = Tx::default().signed().verified();
let tx2 = {
let mut tx = Tx::default().signed().verified();
tx.priority = ::pool::Priority::Local;
tx
let tx1 = Tx {
nonce: 1,
gas_price: 1,
..Default::default()
};
let tx2 = Tx {
nonce: 2,
gas_price: 100,
..Default::default()
};
let tx3 = Tx {
nonce: 2,
gas_price: 110,
..Default::default()
};
let tx4 = Tx {
nonce: 2,
gas_price: 130,
..Default::default()
};
assert!(scoring.should_replace(&tx1, &tx2));
assert!(!scoring.should_replace(&tx2, &tx1));
let keypair = Random.generate().unwrap();
let txs = vec![tx1, tx2, tx3, tx4].into_iter().enumerate().map(|(i, tx)| {
let verified = tx.unsigned().sign(keypair.secret(), None).verified();
txpool::Transaction {
insertion_id: i as u64,
transaction: Arc::new(verified),
}
}).collect::<Vec<_>>();
assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew);
assert_eq!(scoring.should_replace(&txs[1], &txs[0]), ReplaceOld);
assert_eq!(scoring.should_replace(&txs[1], &txs[2]), RejectNew);
assert_eq!(scoring.should_replace(&txs[2], &txs[1]), RejectNew);
assert_eq!(scoring.should_replace(&txs[1], &txs[3]), ReplaceOld);
assert_eq!(scoring.should_replace(&txs[3], &txs[1]), RejectNew);
}
#[test]
fn should_replace_different_sender_by_priority_and_gas_price() {
// given
let scoring = NonceAndGasPrice(PrioritizationStrategy::GasPriceOnly);
let tx_regular_low_gas = {
let tx = Tx {
nonce: 1,
gas_price: 1,
..Default::default()
};
let verified_tx = tx.signed().verified();
txpool::Transaction {
insertion_id: 0,
transaction: Arc::new(verified_tx),
}
};
let tx_regular_high_gas = {
let tx = Tx {
nonce: 2,
gas_price: 10,
..Default::default()
};
let verified_tx = tx.signed().verified();
txpool::Transaction {
insertion_id: 1,
transaction: Arc::new(verified_tx),
}
};
let tx_local_low_gas = {
let tx = Tx {
nonce: 2,
gas_price: 1,
..Default::default()
};
let mut verified_tx = tx.signed().verified();
verified_tx.priority = ::pool::Priority::Local;
txpool::Transaction {
insertion_id: 2,
transaction: Arc::new(verified_tx),
}
};
let tx_local_high_gas = {
let tx = Tx {
nonce: 1,
gas_price: 10,
..Default::default()
};
let mut verified_tx = tx.signed().verified();
verified_tx.priority = ::pool::Priority::Local;
txpool::Transaction {
insertion_id: 3,
transaction: Arc::new(verified_tx),
}
};
assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld);
assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_regular_low_gas), RejectNew);
assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_local_low_gas), ReplaceOld);
assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_regular_high_gas), RejectNew);
assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_local_high_gas), InsertNew);
assert_eq!(scoring.should_replace(&tx_local_high_gas, &tx_regular_low_gas), RejectNew);
}
#[test]
@ -155,41 +276,44 @@ mod tests {
1 => ::pool::Priority::Retracted,
_ => ::pool::Priority::Regular,
};
Arc::new(verified)
txpool::Transaction {
insertion_id: 0,
transaction: Arc::new(verified),
}
}).collect::<Vec<_>>();
let initial_scores = vec![U256::from(0), 0.into(), 0.into()];
// No update required
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(2));
assert_eq!(scores, initial_scores);
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(2));
assert_eq!(scores, initial_scores);
// Compute score at given index
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);
// Check penalization
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Event(()));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Event(()));
assert_eq!(scores, vec![32768.into(), 128.into(), 0.into()]);
}
}

View File

@ -41,7 +41,6 @@ fn new_queue() -> TransactionQueue {
PrioritizationStrategy::GasPriceOnly,
)
}
#[test]
fn should_return_correct_nonces_when_dropped_because_of_limit() {
// given
@ -63,8 +62,8 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() {
let nonce = tx1.nonce;
// when
let r1= txq.import(TestClient::new(), vec![tx1].local());
let r2= txq.import(TestClient::new(), vec![tx2].local());
let r1 = txq.import(TestClient::new(), vec![tx1].retracted());
let r2 = txq.import(TestClient::new(), vec![tx2].retracted());
assert_eq!(r1, vec![Ok(())]);
assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]);
assert_eq!(txq.status().status.transaction_count, 1);
@ -72,6 +71,58 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() {
// then
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));
// when
let tx1 = Tx::gas_price(2).signed();
let tx2 = Tx::gas_price(2).signed();
let tx3 = Tx::gas_price(1).signed();
let tx4 = Tx::gas_price(3).signed();
let res = txq.import(TestClient::new(), vec![tx1, tx2].retracted());
let res2 = txq.import(TestClient::new(), vec![tx3, tx4].retracted());
// then
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(res2, vec![
// The error here indicates reaching the limit
// and minimal effective gas price taken into account.
Err(transaction::Error::InsufficientGasPrice { minimal: 2.into(), got: 1.into() }),
Ok(())
]);
assert_eq!(txq.status().status.transaction_count, 3);
// First inserted transacton got dropped because of limit
assert_eq!(txq.next_nonce(TestClient::new(), &sender), None);
}
#[test]
fn should_never_drop_local_transactions_from_different_senders() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 3,
max_per_sender: 1,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let (tx1, tx2) = Tx::gas_price(2).signed_pair();
let sender = tx1.sender();
let nonce = tx1.nonce;
// when
let r1 = txq.import(TestClient::new(), vec![tx1].local());
let r2 = txq.import(TestClient::new(), vec![tx2].local());
assert_eq!(r1, vec![Ok(())]);
// max-per-sender is reached, that's ok.
assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]);
assert_eq!(txq.status().status.transaction_count, 1);
// then
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));
// when
let tx1 = Tx::gas_price(2).signed();
let tx2 = Tx::gas_price(2).signed();
@ -82,10 +133,9 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() {
// then
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(res2, vec![Err(transaction::Error::LimitReached), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 3);
// First inserted transacton got dropped because of limit
assert_eq!(txq.next_nonce(TestClient::new(), &sender), None);
assert_eq!(res2, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 5);
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));
}
#[test]
@ -815,8 +865,8 @@ fn should_avoid_verifying_transaction_already_in_pool() {
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new();
let tx1 = Tx::default().signed().unverified();
let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Ok(())]);
@ -832,3 +882,82 @@ fn should_avoid_verifying_transaction_already_in_pool() {
// then
assert_eq!(txq.status().status.transaction_count, 1);
}
#[test]
fn should_avoid_reverifying_recently_rejected_transactions() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new();
let tx1 = Tx::gas_price(10_000).signed().unverified();
let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
assert!(client.was_verification_triggered());
// when
let client = TestClient::new();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert!(!client.was_verification_triggered());
// then
assert_eq!(txq.status().status.transaction_count, 0);
}
fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let client = TestClient::new().with_balance(1_000_000_000);
let tx1 = Tx::gas_price(2).signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Ok(())]);
assert_eq!(txq.status().status.transaction_count, 1);
assert!(client.was_verification_triggered());
// when
let client = TestClient::new();
let tx1 = Tx::default().signed().unverified();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientGasPrice {
minimal: 2.into(),
got: 1.into(),
})]);
assert!(!client.was_verification_triggered());
// then
assert_eq!(txq.status().status.transaction_count, 1);
}

View File

@ -23,9 +23,9 @@ use pool::{verifier, VerifiedTransaction};
#[derive(Clone)]
pub struct Tx {
nonce: u64,
gas: u64,
gas_price: u64,
pub nonce: u64,
pub gas: u64,
pub gas_price: u64,
}
impl Default for Tx {

View File

@ -85,6 +85,15 @@ impl Transaction {
}
}
/// Return transaction gas price
pub fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn gas(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas,
@ -93,15 +102,6 @@ impl Transaction {
}
}
fn gas_price(&self) -> &U256 {
match *self {
Transaction::Unverified(ref tx) => &tx.gas_price,
Transaction::Retracted(ref tx) => &tx.gas_price,
Transaction::Local(ref tx) => &tx.gas_price,
}
}
fn transaction(&self) -> &transaction::Transaction {
match *self {
Transaction::Unverified(ref tx) => &*tx,
@ -129,24 +129,31 @@ impl Transaction {
///
/// Verification can be run in parallel for all incoming transactions.
#[derive(Debug)]
pub struct Verifier<C> {
pub struct Verifier<C, S, V> {
client: C,
options: Options,
id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
}
impl<C> Verifier<C> {
impl<C, S, V> Verifier<C, S, V> {
/// Creates new transaction verfier with specified options.
pub fn new(client: C, options: Options, id: Arc<AtomicUsize>) -> Self {
pub fn new(
client: C,
options: Options,
id: Arc<AtomicUsize>,
transaction_to_replace: Option<(S, Arc<V>)>,
) -> Self {
Verifier {
client,
options,
id,
transaction_to_replace,
}
}
}
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
impl<C: Client> txpool::Verifier<Transaction> for Verifier<C, ::pool::scoring::NonceAndGasPrice, VerifiedTransaction> {
type Error = transaction::Error;
type VerifiedTransaction = VerifiedTransaction;
@ -165,7 +172,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
if tx.gas() > &gas_limit {
debug!(
target: "txqueue",
"[{:?}] Dropping transaction above gas limit: {} > min({}, {})",
"[{:?}] Rejected transaction above gas limit: {} > min({}, {})",
hash,
tx.gas(),
self.options.block_gas_limit,
@ -180,7 +187,7 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
let minimal_gas = self.client.required_gas(tx.transaction());
if tx.gas() < &minimal_gas {
trace!(target: "txqueue",
"[{:?}] Dropping transaction with insufficient gas: {} < {}",
"[{:?}] Rejected transaction with insufficient gas: {} < {}",
hash,
tx.gas(),
minimal_gas,
@ -193,22 +200,40 @@ impl<C: Client> txpool::Verifier<Transaction> for Verifier<C> {
}
let is_own = tx.is_local();
// Quick exit for non-service transactions
if tx.gas_price() < &self.options.minimal_gas_price
&& !tx.gas_price().is_zero()
&& !is_own
{
trace!(
target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
hash,
tx.gas_price(),
self.options.minimal_gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: self.options.minimal_gas_price,
got: *tx.gas_price(),
});
// Quick exit for non-service and non-local transactions
//
// We're checking if the transaction is below configured minimal gas price
// or the effective minimal gas price in case the pool is full.
if !tx.gas_price().is_zero() && !is_own {
if tx.gas_price() < &self.options.minimal_gas_price {
trace!(
target: "txqueue",
"[{:?}] Rejected tx below minimal gas price threshold: {} < {}",
hash,
tx.gas_price(),
self.options.minimal_gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: self.options.minimal_gas_price,
got: *tx.gas_price(),
});
}
if let Some((ref scoring, ref vtx)) = self.transaction_to_replace {
if scoring.should_reject_early(vtx, &tx) {
trace!(
target: "txqueue",
"[{:?}] Rejected tx early, cause it doesn't have any chance to get to the pool: (gas price: {} < {})",
hash,
tx.gas_price(),
vtx.transaction.gas_price,
);
bail!(transaction::Error::InsufficientGasPrice {
minimal: vtx.transaction.gas_price,
got: *tx.gas_price(),
});
}
}
}
// Some more heavy checks below.

View File

@ -10,7 +10,7 @@
!define DESCRIPTION "Fast, light, robust Ethereum implementation"
!define VERSIONMAJOR 1
!define VERSIONMINOR 11
!define VERSIONBUILD 5
!define VERSIONBUILD 6
!define ARGS ""
!define FIRST_START_ARGS "--mode=passive ui"

View File

@ -362,7 +362,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
algorithm,
cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal
cmd.check_seal,
);
client_config.queue.verifier_settings = cmd.verifier_settings;

View File

@ -294,7 +294,7 @@ usage! {
ARG arg_chain: (String) = "foundation", or |c: &Config| c.parity.as_ref()?.chain.clone(),
"--chain=[CHAIN]",
"Specify the blockchain type. CHAIN may be either a JSON chain specification file or olympic, frontier, homestead, mainnet, morden, ropsten, classic, expanse, musicoin, ellaism, easthub, social, testnet, kovan or dev.",
"Specify the blockchain type. CHAIN may be either a JSON chain specification file or olympic, frontier, homestead, mainnet, morden, ropsten, classic, expanse, tobalaba, musicoin, ellaism, easthub, social, testnet, kovan or dev.",
ARG arg_keys_path: (String) = "$BASE/keys", or |c: &Config| c.parity.as_ref()?.keys_path.clone(),
"--keys-path=[PATH]",

View File

@ -33,6 +33,7 @@ pub enum SpecType {
Foundation,
Morden,
Ropsten,
Tobalaba,
Kovan,
Olympic,
Classic,
@ -61,6 +62,7 @@ impl str::FromStr for SpecType {
"morden" | "classic-testnet" => SpecType::Morden,
"ropsten" => SpecType::Ropsten,
"kovan" | "testnet" => SpecType::Kovan,
"tobalaba" => SpecType::Tobalaba,
"olympic" => SpecType::Olympic,
"expanse" => SpecType::Expanse,
"musicoin" => SpecType::Musicoin,
@ -88,6 +90,7 @@ impl fmt::Display for SpecType {
SpecType::Easthub => "easthub",
SpecType::Social => "social",
SpecType::Kovan => "kovan",
SpecType::Tobalaba => "tobalaba",
SpecType::Dev => "dev",
SpecType::Custom(ref custom) => custom,
})
@ -108,6 +111,7 @@ impl SpecType {
SpecType::Ellaism => Ok(ethereum::new_ellaism(params)),
SpecType::Easthub => Ok(ethereum::new_easthub(params)),
SpecType::Social => Ok(ethereum::new_social(params)),
SpecType::Tobalaba => Ok(ethereum::new_tobalaba(params)),
SpecType::Kovan => Ok(ethereum::new_kovan(params)),
SpecType::Dev => Ok(Spec::new_instant()),
SpecType::Custom(ref filename) => {

View File

@ -544,6 +544,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
// fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner
let miner = Arc::new(Miner::new(
cmd.miner_options,
@ -600,6 +601,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
);
client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);
// set up bootnodes
let mut net_conf = cmd.net_conf;

View File

@ -182,7 +182,7 @@ impl SnapshotCommand {
algorithm,
self.pruning_history,
self.pruning_memory,
true
true,
);
let client_db = db::open_client_db(&client_path, &client_config)?;

View File

@ -122,7 +122,7 @@ impl Default for UserDefaults {
fn default() -> Self {
UserDefaults {
is_first_launch: true,
pruning: Algorithm::default(),
pruning: Algorithm::OverlayRecent,
tracing: false,
fat_db: false,
mode: Mode::Active,

View File

@ -40,7 +40,7 @@ mod subscription_manager;
pub use self::dispatch::{Dispatcher, FullDispatcher};
pub use self::network_settings::NetworkSettings;
pub use self::poll_manager::PollManager;
pub use self::poll_filter::{PollFilter, limit_logs};
pub use self::poll_filter::{PollFilter, SyncPollFilter, limit_logs};
pub use self::requests::{
TransactionRequest, FilledTransactionRequest, ConfirmationRequest, ConfirmationPayload, CallRequest,
};

View File

@ -1,18 +1,40 @@
//! Helper type with all filter state data.
use std::collections::HashSet;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
use ethereum_types::H256;
use parking_lot::Mutex;
use v1::types::{Filter, Log};
pub type BlockNumber = u64;
/// Thread-safe filter state.
#[derive(Clone)]
pub struct SyncPollFilter(Arc<Mutex<PollFilter>>);
impl SyncPollFilter {
/// New `SyncPollFilter`
pub fn new(f: PollFilter) -> Self {
SyncPollFilter(Arc::new(Mutex::new(f)))
}
/// Modify underlying filter
pub fn modify<F, R>(&self, f: F) -> R where
F: FnOnce(&mut PollFilter) -> R,
{
f(&mut self.0.lock())
}
}
/// Filter state.
#[derive(Clone)]
pub enum PollFilter {
/// Number of last block which client was notified about.
Block(BlockNumber),
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
/// Hashes of all pending transactions the client knows about.
PendingTransaction(BTreeSet<H256>),
/// Number of From block number, last seen block hash, pending logs and log filter itself.
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
}

View File

@ -608,11 +608,9 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM, T: StateInfo + 'static> Eth for EthClient<
}
fn block_transaction_count_by_number(&self, num: BlockNumber) -> BoxFuture<Option<RpcU256>> {
let block_number = self.client.chain_info().best_block_number;
Box::new(future::ok(match num {
BlockNumber::Pending =>
self.miner.pending_transactions(block_number).map(|x| x.len().into()),
Some(self.miner.pending_transaction_hashes(&*self.client).len().into()),
_ =>
self.client.block(block_number_to_id(num)).map(|block| block.transactions_count().into())
}))

View File

@ -17,7 +17,7 @@
//! Eth Filter RPC implementation
use std::sync::Arc;
use std::collections::HashSet;
use std::collections::BTreeSet;
use ethcore::miner::{self, MinerService};
use ethcore::filter::Filter as EthcoreFilter;
@ -30,7 +30,7 @@ use jsonrpc_core::futures::{future, Future};
use jsonrpc_core::futures::future::Either;
use v1::traits::EthFilter;
use v1::types::{BlockNumber, Index, Filter, FilterChanges, Log, H256 as RpcH256, U256 as RpcU256};
use v1::helpers::{errors, PollFilter, PollManager, limit_logs};
use v1::helpers::{errors, SyncPollFilter, PollFilter, PollManager, limit_logs};
use v1::impls::eth::pending_logs;
/// Something which provides data that can be filtered over.
@ -41,8 +41,8 @@ pub trait Filterable {
/// Get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<H256>;
/// pending transaction hashes at the given block.
fn pending_transactions_hashes(&self) -> Vec<H256>;
/// pending transaction hashes at the given block (unordered).
fn pending_transaction_hashes(&self) -> BTreeSet<H256>;
/// Get logs that match the given filter.
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>>;
@ -51,7 +51,7 @@ pub trait Filterable {
fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec<Log>;
/// Get a reference to the poll manager.
fn polls(&self) -> &Mutex<PollManager<PollFilter>>;
fn polls(&self) -> &Mutex<PollManager<SyncPollFilter>>;
/// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed.
fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64);
@ -61,7 +61,7 @@ pub trait Filterable {
pub struct EthFilterClient<C, M> {
client: Arc<C>,
miner: Arc<M>,
polls: Mutex<PollManager<PollFilter>>,
polls: Mutex<PollManager<SyncPollFilter>>,
}
impl<C, M> EthFilterClient<C, M> {
@ -87,11 +87,8 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
self.client.block_hash(id)
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
self.miner.ready_transactions(&*self.client)
.into_iter()
.map(|tx| tx.signed().hash())
.collect()
fn pending_transaction_hashes(&self) -> BTreeSet<H256> {
self.miner.pending_transaction_hashes(&*self.client)
}
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>> {
@ -102,7 +99,7 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
pending_logs(&*self.miner, block_number, filter)
}
fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls }
fn polls(&self) -> &Mutex<PollManager<SyncPollFilter>> { &self.polls }
fn removed_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64) {
let inner = || -> Option<Vec<H256>> {
@ -145,127 +142,124 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256> {
let mut polls = self.polls().lock();
let block_number = self.best_block_number();
let id = polls.create_poll(PollFilter::Logs(block_number, None, Default::default(), filter));
let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs(block_number, None, Default::default(), filter)));
Ok(id.into())
}
fn new_block_filter(&self) -> Result<RpcU256> {
let mut polls = self.polls().lock();
// +1, since we don't want to include the current block
let id = polls.create_poll(PollFilter::Block(self.best_block_number() + 1));
let id = polls.create_poll(SyncPollFilter::new(PollFilter::Block(self.best_block_number() + 1)));
Ok(id.into())
}
fn new_pending_transaction_filter(&self) -> Result<RpcU256> {
let mut polls = self.polls().lock();
let pending_transactions = self.pending_transactions_hashes();
let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions));
let pending_transactions = self.pending_transaction_hashes();
let id = polls.create_poll(SyncPollFilter::new(PollFilter::PendingTransaction(pending_transactions)));
Ok(id.into())
}
fn filter_changes(&self, index: Index) -> BoxFuture<FilterChanges> {
let mut polls = self.polls().lock();
Box::new(match polls.poll_mut(&index.value()) {
None => Either::A(future::err(errors::filter_not_found())),
Some(filter) => match *filter {
PollFilter::Block(ref mut block_number) => {
// +1, cause we want to return hashes including current block hash.
let current_number = self.best_block_number() + 1;
let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number)
.filter_map(|id| self.block_hash(id).map(Into::into))
.collect::<Vec<RpcH256>>();
let filter = match self.polls().lock().poll_mut(&index.value()) {
Some(filter) => filter.clone(),
None => return Box::new(future::err(errors::filter_not_found())),
};
*block_number = current_number;
Box::new(filter.modify(|filter| match *filter {
PollFilter::Block(ref mut block_number) => {
// +1, cause we want to return hashes including current block hash.
let current_number = self.best_block_number() + 1;
let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number)
.filter_map(|id| self.block_hash(id).map(Into::into))
.collect::<Vec<RpcH256>>();
Either::A(future::ok(FilterChanges::Hashes(hashes)))
},
PollFilter::PendingTransaction(ref mut previous_hashes) => {
// get hashes of pending transactions
let current_hashes = self.pending_transactions_hashes();
*block_number = current_number;
let new_hashes =
{
let previous_hashes_set = previous_hashes.iter().collect::<HashSet<_>>();
Either::A(future::ok(FilterChanges::Hashes(hashes)))
},
PollFilter::PendingTransaction(ref mut previous_hashes) => {
// get hashes of pending transactions
let current_hashes = self.pending_transaction_hashes();
// find all new hashes
current_hashes
.iter()
.filter(|hash| !previous_hashes_set.contains(hash))
.cloned()
.map(Into::into)
.collect::<Vec<RpcH256>>()
};
let new_hashes = {
// find all new hashes
current_hashes.difference(previous_hashes)
.cloned()
.map(Into::into)
.collect()
};
// save all hashes of pending transactions
*previous_hashes = current_hashes;
// save all hashes of pending transactions
*previous_hashes = current_hashes;
// return new hashes
Either::A(future::ok(FilterChanges::Hashes(new_hashes)))
},
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = self.best_block_number();
// return new hashes
Either::A(future::ok(FilterChanges::Hashes(new_hashes)))
},
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = self.best_block_number();
// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);
// check if we need to check pending hashes
let include_pending = filter.to_block == Some(BlockNumber::Pending);
// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();
// build appropriate filter
let mut filter: EthcoreFilter = filter.clone().into();
// retrieve reorg logs
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter));
*block_number -= reorg_len as u64;
// retrieve reorg logs
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter));
*block_number -= reorg_len as u64;
filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest;
filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest;
// retrieve pending logs
let pending = if include_pending {
let pending_logs = self.pending_logs(current_number, &filter);
// retrieve pending logs
let pending = if include_pending {
let pending_logs = self.pending_logs(current_number, &filter);
// remove logs about which client was already notified about
let new_pending_logs: Vec<_> = pending_logs.iter()
.filter(|p| !previous_logs.contains(p))
.cloned()
.collect();
// remove logs about which client was already notified about
let new_pending_logs: Vec<_> = pending_logs.iter()
.filter(|p| !previous_logs.contains(p))
.cloned()
.collect();
// save all logs retrieved by client
*previous_logs = pending_logs.into_iter().collect();
// save all logs retrieved by client
*previous_logs = pending_logs.into_iter().collect();
new_pending_logs
} else {
Vec::new()
};
new_pending_logs
} else {
Vec::new()
};
// save the number of the next block as a first block from which
// we want to get logs
*block_number = current_number + 1;
// save the number of the next block as a first block from which
// we want to get logs
*block_number = current_number + 1;
// save the current block hash, which we used to get back to the
// canon chain in case of reorg.
*last_block_hash = self.block_hash(BlockId::Number(current_number));
// save the current block hash, which we used to get back to the
// canon chain in case of reorg.
*last_block_hash = self.block_hash(BlockId::Number(current_number));
// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter.limit;
Either::B(self.logs(filter)
.map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs
.map(move |logs| limit_logs(logs, limit)) // limit the logs
.map(FilterChanges::Logs))
}
// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter.limit;
Either::B(self.logs(filter)
.map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs
.map(move |logs| limit_logs(logs, limit)) // limit the logs
.map(FilterChanges::Logs))
}
})
}))
}
fn filter_logs(&self, index: Index) -> BoxFuture<Vec<Log>> {
let filter = {
let mut polls = self.polls().lock();
match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref _last_block_hash, ref _previous_log, ref filter)) => filter.clone(),
// just empty array
Some(_) => return Box::new(future::ok(Vec::new())),
match polls.poll(&index.value()).and_then(|f| f.modify(|filter| match *filter {
PollFilter::Logs(.., ref filter) => Some(filter.clone()),
_ => None,
})) {
Some(filter) => filter,
None => return Box::new(future::err(errors::filter_not_found())),
}
};

View File

@ -16,6 +16,7 @@
//! Eth RPC interface for the light client.
use std::collections::BTreeSet;
use std::sync::Arc;
use jsonrpc_core::{Result, BoxFuture};
@ -41,7 +42,7 @@ use transaction::SignedTransaction;
use v1::impls::eth_filter::Filterable;
use v1::helpers::{errors, limit_logs};
use v1::helpers::{PollFilter, PollManager};
use v1::helpers::{SyncPollFilter, PollManager};
use v1::helpers::light_fetch::{self, LightFetch};
use v1::traits::Eth;
use v1::types::{
@ -61,7 +62,7 @@ pub struct EthClient<T> {
transaction_queue: Arc<RwLock<TransactionQueue>>,
accounts: Arc<AccountProvider>,
cache: Arc<Mutex<LightDataCache>>,
polls: Mutex<PollManager<PollFilter>>,
polls: Mutex<PollManager<SyncPollFilter>>,
gas_price_percentile: usize,
}
@ -533,8 +534,8 @@ impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
self.client.block_hash(id)
}
fn pending_transactions_hashes(&self) -> Vec<::ethereum_types::H256> {
Vec::new()
fn pending_transaction_hashes(&self) -> BTreeSet<::ethereum_types::H256> {
BTreeSet::new()
}
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>> {
@ -545,7 +546,7 @@ impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
Vec::new() // light clients don't mine.
}
fn polls(&self) -> &Mutex<PollManager<PollFilter>> {
fn polls(&self) -> &Mutex<PollManager<SyncPollFilter>> {
&self.polls
}

View File

@ -17,7 +17,7 @@
//! Test implementation of miner service.
use std::sync::Arc;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use bytes::Bytes;
use ethcore::account_provider::SignError as AccountError;
@ -219,6 +219,10 @@ impl MinerService for TestMinerService {
self.queued_transactions()
}
fn pending_transaction_hashes<C>(&self, _chain: &C) -> BTreeSet<H256> {
self.queued_transactions().into_iter().map(|tx| tx.signed().hash()).collect()
}
fn queued_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.pending_transactions.lock().values().cloned().map(|tx| {
Arc::new(VerifiedTransaction::from_pending_block_transaction(tx))

View File

@ -22,9 +22,8 @@ echo "Parity version: " $VER
echo "Branch: " $CI_BUILD_REF_NAME
echo "--------------------"
# NOTE for md5 and sha256 we want to display filename as well
# NOTE for sha256 we want to display filename as well
# hence we use --* instead of -p *
MD5_BIN="rhash --md5"
SHA256_BIN="rhash --sha256"
set_env () {
@ -77,22 +76,16 @@ calculate_checksums () {
echo "Checksum calculation:"
rhash --version
rm -rf *.md5
rm -rf *.sha256
BIN="target/$PLATFORM/release/parity$S3WIN"
export SHA3="$($BIN tools hash $BIN)"
echo "Parity file SHA3: $SHA3"
$MD5_BIN target/$PLATFORM/release/parity$S3WIN > parity$S3WIN.md5
$SHA256_BIN target/$PLATFORM/release/parity$S3WIN > parity$S3WIN.sha256
$MD5_BIN target/$PLATFORM/release/parity-evm$S3WIN > parity-evm$S3WIN.md5
$SHA256_BIN target/$PLATFORM/release/parity-evm$S3WIN > parity-evm$S3WIN.sha256
$MD5_BIN target/$PLATFORM/release/ethstore$S3WIN > ethstore$S3WIN.md5
$SHA256_BIN target/$PLATFORM/release/ethstore$S3WIN > ethstore$S3WIN.sha256
$MD5_BIN target/$PLATFORM/release/ethkey$S3WIN > ethkey$S3WIN.md5
$SHA256_BIN target/$PLATFORM/release/ethkey$S3WIN > ethkey$S3WIN.sha256
$MD5_BIN target/$PLATFORM/release/whisper$S3WIN > whisper$S3WIN.md5
$SHA256_BIN target/$PLATFORM/release/whisper$S3WIN > whisper$S3WIN.sha256
}
make_deb () {
@ -129,7 +122,6 @@ make_deb () {
cp target/$PLATFORM/release/ethkey deb/usr/bin/ethkey
cp target/$PLATFORM/release/whisper deb/usr/bin/whisper
dpkg-deb -b deb "parity_"$VER"_"$IDENT"_"$ARC".deb"
$MD5_BIN "parity_"$VER"_"$IDENT"_"$ARC".deb" > "parity_"$VER"_"$IDENT"_"$ARC".deb.md5"
$SHA256_BIN "parity_"$VER"_"$IDENT"_"$ARC".deb" > "parity_"$VER"_"$IDENT"_"$ARC".deb.sha256"
}
make_rpm () {
@ -144,7 +136,6 @@ make_rpm () {
rm -rf "parity-"$VER"-1."$ARC".rpm" || true
fpm -s dir -t rpm -n parity -v $VER --epoch 1 --license GPLv3 -d openssl --provides parity --url https://parity.io --vendor "Parity Technologies" -a x86_64 -m "<devops@parity.io>" --description "Ethereum network client by Parity Technologies" -C /install/
cp "parity-"$VER"-1."$ARC".rpm" "parity_"$VER"_"$IDENT"_"$ARC".rpm"
$MD5_BIN "parity_"$VER"_"$IDENT"_"$ARC".rpm" > "parity_"$VER"_"$IDENT"_"$ARC".rpm.md5"
$SHA256_BIN "parity_"$VER"_"$IDENT"_"$ARC".rpm" > "parity_"$VER"_"$IDENT"_"$ARC".rpm.sha256"
}
make_pkg () {
@ -160,7 +151,6 @@ make_pkg () {
packagesbuild -v mac/Parity.pkgproj
productsign --sign 'Developer ID Installer: PARITY TECHNOLOGIES LIMITED (P2PX3JU8FT)' target/release/Parity\ Ethereum.pkg target/release/Parity\ Ethereum-signed.pkg
mv target/release/Parity\ Ethereum-signed.pkg "parity_"$VER"_"$IDENT"_"$ARC".pkg"
$MD5_BIN "parity_"$VER"_"$IDENT"_"$ARC"."$EXT >> "parity_"$VER"_"$IDENT"_"$ARC".pkg.md5"
$SHA256_BIN "parity_"$VER"_"$IDENT"_"$ARC"."$EXT >> "parity_"$VER"_"$IDENT"_"$ARC".pkg.sha256"
}
sign_exe () {
@ -180,7 +170,6 @@ make_exe () {
cd ..
cp nsis/installer.exe "parity_"$VER"_"$IDENT"_"$ARC"."$EXT
./sign.cmd $keyfile $certpass "parity_"$VER"_"$IDENT"_"$ARC"."$EXT
$MD5_BIN "parity_"$VER"_"$IDENT"_"$ARC"."$EXT -p %h > "parity_"$VER"_"$IDENT"_"$ARC"."$EXT".md5"
$SHA256_BIN "parity_"$VER"_"$IDENT"_"$ARC"."$EXT -p %h > "parity_"$VER"_"$IDENT"_"$ARC"."$EXT".sha256"
}
push_binaries () {
@ -195,28 +184,22 @@ push_binaries () {
fi
aws s3 rm --recursive s3://$S3_BUCKET/$CI_BUILD_REF_NAME/$BUILD_PLATFORM
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity$S3WIN --body target/$PLATFORM/release/parity$S3WIN
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity$S3WIN.md5 --body parity$S3WIN.md5
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity$S3WIN.sha256 --body parity$S3WIN.sha256
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity-evm$S3WIN --body target/$PLATFORM/release/parity-evm$S3WIN
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity-evm$S3WIN.md5 --body parity-evm$S3WIN.md5
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/parity-evm$S3WIN.sha256 --body parity-evm$S3WIN.sha256
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethstore$S3WIN --body target/$PLATFORM/release/ethstore$S3WIN
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethstore$S3WIN.md5 --body ethstore$S3WIN.md5
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethstore$S3WIN.sha256 --body ethstore$S3WIN.sha256
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethkey$S3WIN --body target/$PLATFORM/release/ethkey$S3WIN
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethkey$S3WIN.md5 --body ethkey$S3WIN.md5
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/ethkey$S3WIN.sha256 --body ethkey$S3WIN.sha256
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/whisper$S3WIN --body target/$PLATFORM/release/whisper$S3WIN
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/whisper$S3WIN.md5 --body whisper$S3WIN.md5
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/whisper$S3WIN.sha256 --body whisper$S3WIN.sha256
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/"parity_"$VER"_"$IDENT"_"$ARC"."$EXT --body "parity_"$VER"_"$IDENT"_"$ARC"."$EXT
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/"parity_"$VER"_"$IDENT"_"$ARC"."$EXT".md5" --body "parity_"$VER"_"$IDENT"_"$ARC"."$EXT".md5"
aws s3api put-object --bucket $S3_BUCKET --key $CI_BUILD_REF_NAME/$BUILD_PLATFORM/"parity_"$VER"_"$IDENT"_"$ARC"."$EXT".sha256" --body "parity_"$VER"_"$IDENT"_"$ARC"."$EXT".sha256"
}
make_archive () {
echo "add artifacts to archive"
rm -rf parity.zip
zip -r parity.zip target/$PLATFORM/release/parity$S3WIN target/$PLATFORM/release/parity-evm$S3WIN target/$PLATFORM/release/ethstore$S3WIN target/$PLATFORM/release/ethkey$S3WIN target/$PLATFORM/release/whisper$S3WIN parity$S3WIN.md5 parity-evm$S3WIN.md5 ethstore$S3WIN.md5 ethkey$S3WIN.md5 whisper$S3WIN.md5 parity$S3WIN.sha256 parity-evm$S3WIN.sha256 ethstore$S3WIN.sha256 ethkey$S3WIN.sha256 whisper$S3WIN.sha256
zip -r parity.zip target/$PLATFORM/release/parity$S3WIN target/$PLATFORM/release/parity-evm$S3WIN target/$PLATFORM/release/ethstore$S3WIN target/$PLATFORM/release/ethkey$S3WIN target/$PLATFORM/release/whisper$S3WIN parity$S3WIN.sha256 parity-evm$S3WIN.sha256 ethstore$S3WIN.sha256 ethkey$S3WIN.sha256 whisper$S3WIN.sha256
}
updater_push_release () {
@ -347,11 +330,10 @@ case $BUILD_PLATFORM in
snapcraft push "parity_"$VER"_amd64.snap"
snapcraft status parity
snapcraft logout
$MD5_BIN "parity_"$VER"_amd64.snap" > "parity_"$VER"_amd64.snap.md5"
$SHA256_BIN "parity_"$VER"_amd64.snap" > "parity_"$VER"_amd64.snap.sha256"
echo "add artifacts to archive"
rm -rf parity.zip
zip -r parity.zip "parity_"$VER"_amd64.snap" "parity_"$VER"_amd64.snap.md5" "parity_"$VER"_amd64.snap.sha256"
zip -r parity.zip "parity_"$VER"_amd64.snap" "parity_"$VER"_amd64.snap.sha256"
;;
x86_64-pc-windows-msvc)
set_env_win

View File

@ -10,4 +10,6 @@ error-chain = "0.11"
log = "0.3"
smallvec = "0.4"
trace-time = { path = "../util/trace-time" }
[dev-dependencies]
ethereum-types = "0.3"

View File

@ -14,24 +14,26 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethereum_types::H256;
/// Error chain doesn't let us have generic types.
/// So the hashes are converted to debug strings for easy display.
type Hash = String;
error_chain! {
errors {
/// Transaction is already imported
AlreadyImported(hash: H256) {
AlreadyImported(hash: Hash) {
description("transaction is already in the pool"),
display("[{:?}] already imported", hash)
display("[{}] already imported", hash)
}
/// Transaction is too cheap to enter the queue
TooCheapToEnter(hash: H256, min_score: String) {
TooCheapToEnter(hash: Hash, min_score: String) {
description("the pool is full and transaction is too cheap to replace any transaction"),
display("[{:?}] too cheap to enter the pool. Min score: {}", hash, min_score)
display("[{}] too cheap to enter the pool. Min score: {}", hash, min_score)
}
/// Transaction is too cheap to replace existing transaction that occupies the same slot.
TooCheapToReplace(old_hash: H256, hash: H256) {
TooCheapToReplace(old_hash: Hash, hash: Hash) {
description("transaction is too cheap to replace existing transaction in the pool"),
display("[{:?}] too cheap to replace: {:?}", hash, old_hash)
display("[{}] too cheap to replace: {}", hash, old_hash)
}
}
}

View File

@ -69,14 +69,15 @@
#![warn(missing_docs)]
extern crate smallvec;
extern crate ethereum_types;
extern crate trace_time;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
extern crate trace_time;
#[cfg(test)]
extern crate ethereum_types;
#[cfg(test)]
mod tests;
@ -95,27 +96,29 @@ pub mod scoring;
pub use self::error::{Error, ErrorKind};
pub use self::listener::{Listener, NoopListener};
pub use self::options::Options;
pub use self::pool::{Pool, PendingIterator};
pub use self::pool::{Pool, PendingIterator, Transaction};
pub use self::ready::{Ready, Readiness};
pub use self::scoring::Scoring;
pub use self::status::{LightStatus, Status};
pub use self::verifier::Verifier;
use std::fmt;
use ethereum_types::{H256, Address};
use std::hash::Hash;
/// Already verified transaction that can be safely queued.
pub trait VerifiedTransaction: fmt::Debug {
/// Transaction hash type.
type Hash: fmt::Debug + fmt::LowerHex + Eq + Clone + Hash;
/// Transaction sender type.
type Sender: fmt::Debug + Eq + Clone + Hash;
/// Transaction hash
fn hash(&self) -> &H256;
fn hash(&self) -> &Self::Hash;
/// Memory usage
fn mem_usage(&self) -> usize;
/// Transaction sender
fn sender(&self) -> &Address;
/// Unique index of insertion (lower = older).
fn insertion_id(&self) -> u64;
fn sender(&self) -> &Self::Sender;
}

View File

@ -17,33 +17,61 @@
use std::sync::Arc;
use std::collections::{HashMap, BTreeSet};
use ethereum_types::{H160, H256};
use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use scoring::{Scoring, ScoreWithRef};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};
use {VerifiedTransaction};
type Sender = H160;
/// Internal representation of transaction.
///
/// Includes unique insertion id that can be used for scoring explictly,
/// but internally is used to resolve conflicts in case of equal scoring
/// (newer transactionsa are preferred).
#[derive(Debug)]
pub struct Transaction<T> {
/// Sequential id of the transaction
pub insertion_id: u64,
/// Shared transaction
pub transaction: Arc<T>,
}
impl<T> Clone for Transaction<T> {
fn clone(&self) -> Self {
Transaction {
insertion_id: self.insertion_id,
transaction: self.transaction.clone(),
}
}
}
impl<T> ::std::ops::Deref for Transaction<T> {
type Target = Arc<T>;
fn deref(&self) -> &Self::Target {
&self.transaction
}
}
/// A transaction pool.
#[derive(Debug)]
pub struct Pool<T, S: Scoring<T>, L = NoopListener> {
pub struct Pool<T: VerifiedTransaction, S: Scoring<T>, L = NoopListener> {
listener: L,
scoring: S,
options: Options,
mem_usage: usize,
transactions: HashMap<Sender, Transactions<T, S>>,
by_hash: HashMap<H256, Arc<T>>,
transactions: HashMap<T::Sender, Transactions<T, S>>,
by_hash: HashMap<T::Hash, Transaction<T>>,
best_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
worst_transactions: BTreeSet<ScoreWithRef<T, S::Score>>,
insertion_id: u64,
}
impl<T: VerifiedTransaction, S: Scoring<T> + Default> Default for Pool<T, S> {
@ -89,6 +117,7 @@ impl<T, S, L> Pool<T, S, L> where
by_hash,
best_transactions: Default::default(),
worst_transactions: Default::default(),
insertion_id: 0,
}
}
@ -104,41 +133,52 @@ impl<T, S, L> Pool<T, S, L> where
/// If any limit is reached the transaction with the lowest `Score` is evicted to make room.
///
/// The `Listener` will be informed on any drops or rejections.
pub fn import(&mut self, mut transaction: T) -> error::Result<Arc<T>> {
pub fn import(&mut self, transaction: T) -> error::Result<Arc<T>> {
let mem_usage = transaction.mem_usage();
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(*transaction.hash()));
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));
self.insertion_id += 1;
let transaction = Transaction {
insertion_id: self.insertion_id,
transaction: Arc::new(transaction),
};
// TODO [ToDr] Most likely move this after the transaction is inserted.
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(&transaction) {
match s.remove_worst(transaction) {
Err(err) => {
s.listener.rejected(&Arc::new(transaction), err.kind());
s.listener.rejected(transaction, err.kind());
Err(err)
},
Ok(removed) => {
s.listener.dropped(&removed, Some(&transaction));
Ok(None) => Ok(false),
Ok(Some(removed)) => {
s.listener.dropped(&removed, Some(transaction));
s.finalize_remove(removed.hash());
Ok(transaction)
Ok(true)
},
}
};
while self.by_hash.len() + 1 > self.options.max_count {
trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}
while self.mem_usage + mem_usage > self.options.max_mem_usage {
trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}
}
let (result, prev_state, current_state) = {
let transactions = self.transactions.entry(*transaction.sender()).or_insert_with(Transactions::default);
let transactions = self.transactions.entry(transaction.sender().clone()).or_insert_with(Transactions::default);
// get worst and best transactions for comparison
let prev = transactions.worst_and_best();
let result = transactions.add(transaction, &self.scoring, self.options.max_per_sender);
@ -153,31 +193,31 @@ impl<T, S, L> Pool<T, S, L> where
AddResult::Ok(tx) => {
self.listener.added(&tx, None);
self.finalize_insert(&tx, None);
Ok(tx)
Ok(tx.transaction)
},
AddResult::PushedOut { new, old } |
AddResult::Replaced { new, old } => {
self.listener.added(&new, Some(&old));
self.finalize_insert(&new, Some(&old));
Ok(new)
Ok(new.transaction)
},
AddResult::TooCheap { new, old } => {
let error = error::ErrorKind::TooCheapToReplace(*old.hash(), *new.hash());
self.listener.rejected(&Arc::new(new), &error);
let error = error::ErrorKind::TooCheapToReplace(format!("{:x}", old.hash()), format!("{:x}", new.hash()));
self.listener.rejected(&new, &error);
bail!(error)
},
AddResult::TooCheapToEnter(new, score) => {
let error = error::ErrorKind::TooCheapToEnter(*new.hash(), format!("{:?}", score));
self.listener.rejected(&Arc::new(new), &error);
let error = error::ErrorKind::TooCheapToEnter(format!("{:x}", new.hash()), format!("{:?}", score));
self.listener.rejected(&new, &error);
bail!(error)
}
}
}
/// Updates state of the pool statistics if the transaction was added to a set.
fn finalize_insert(&mut self, new: &Arc<T>, old: Option<&Arc<T>>) {
fn finalize_insert(&mut self, new: &Transaction<T>, old: Option<&Transaction<T>>) {
self.mem_usage += new.mem_usage();
self.by_hash.insert(*new.hash(), new.clone());
self.by_hash.insert(new.hash().clone(), new.clone());
if let Some(old) = old {
self.finalize_remove(old.hash());
@ -185,23 +225,23 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Updates the pool statistics if transaction was removed.
fn finalize_remove(&mut self, hash: &H256) -> Option<Arc<T>> {
fn finalize_remove(&mut self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.remove(hash).map(|old| {
self.mem_usage -= old.mem_usage();
old
self.mem_usage -= old.transaction.mem_usage();
old.transaction
})
}
/// Updates best and worst transactions from a sender.
fn update_senders_worst_and_best(
&mut self,
previous: Option<((S::Score, Arc<T>), (S::Score, Arc<T>))>,
current: Option<((S::Score, Arc<T>), (S::Score, Arc<T>))>,
previous: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
current: Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))>,
) {
let worst_collection = &mut self.worst_transactions;
let best_collection = &mut self.best_transactions;
let is_same = |a: &(S::Score, Arc<T>), b: &(S::Score, Arc<T>)| {
let is_same = |a: &(S::Score, Transaction<T>), b: &(S::Score, Transaction<T>)| {
a.0 == b.0 && a.1.hash() == b.1.hash()
};
@ -238,32 +278,42 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
fn remove_worst(&mut self, transaction: &T) -> error::Result<Arc<T>> {
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Option<Transaction<T>>> {
let to_remove = match self.worst_transactions.iter().next_back() {
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash(), "unknown".into()).into());
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into());
},
Some(old) => if self.scoring.should_replace(&old.transaction, transaction) {
Some(old) => match self.scoring.should_replace(&old.transaction, transaction) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
old.clone()
} else {
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
return Err(error::ErrorKind::TooCheapToEnter(*transaction.hash(), format!("{:?}", old.score)).into())
scoring::Choice::RejectNew => {
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
},
},
};
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
if let Some(to_remove) = to_remove {
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
Ok(to_remove.transaction)
Ok(Some(to_remove.transaction))
} else {
Ok(None)
}
}
/// Removes transaction from sender's transaction `HashMap`.
fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(&mut self, sender: &Sender, f: F) -> Option<R> {
fn remove_from_set<R, F: FnOnce(&mut Transactions<T, S>, &S) -> R>(&mut self, sender: &T::Sender, f: F) -> Option<R> {
let (prev, next, result) = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
let result = f(set, &self.scoring);
@ -286,14 +336,14 @@ impl<T, S, L> Pool<T, S, L> where
self.worst_transactions.clear();
for (_hash, tx) in self.by_hash.drain() {
self.listener.dropped(&tx, None)
self.listener.dropped(&tx.transaction, None)
}
}
/// Removes single transaction from the pool.
/// Depending on the `is_invalid` flag the listener
/// will either get a `cancelled` or `invalid` notification.
pub fn remove(&mut self, hash: &H256, is_invalid: bool) -> Option<Arc<T>> {
pub fn remove(&mut self, hash: &T::Hash, is_invalid: bool) -> Option<Arc<T>> {
if let Some(tx) = self.finalize_remove(hash) {
self.remove_from_set(tx.sender(), |set, scoring| {
set.remove(&tx, scoring)
@ -310,7 +360,7 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Removes all stalled transactions from given sender.
fn remove_stalled<R: Ready<T>>(&mut self, sender: &Sender, ready: &mut R) -> usize {
fn remove_stalled<R: Ready<T>>(&mut self, sender: &T::Sender, ready: &mut R) -> usize {
let removed_from_set = self.remove_from_set(sender, |transactions, scoring| {
transactions.cull(ready, scoring)
});
@ -329,7 +379,7 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Removes all stalled transactions from given sender list (or from all senders).
pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[Sender]>, mut ready: R) -> usize {
pub fn cull<R: Ready<T>>(&mut self, senders: Option<&[T::Sender]>, mut ready: R) -> usize {
let mut removed = 0;
match senders {
Some(senders) => {
@ -349,13 +399,24 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Returns a transaction if it's part of the pool or `None` otherwise.
pub fn find(&self, hash: &H256) -> Option<Arc<T>> {
self.by_hash.get(hash).cloned()
pub fn find(&self, hash: &T::Hash) -> Option<Arc<T>> {
self.by_hash.get(hash).map(|t| t.transaction.clone())
}
/// Returns worst transaction in the queue (if any).
pub fn worst_transaction(&self) -> Option<Arc<T>> {
self.worst_transactions.iter().next().map(|x| x.transaction.clone())
self.worst_transactions.iter().next_back().map(|x| x.transaction.transaction.clone())
}
/// Returns true if the pool is at it's capacity.
pub fn is_full(&self) -> bool {
self.by_hash.len() >= self.options.max_count
|| self.mem_usage >= self.options.max_mem_usage
}
/// Returns senders ordered by priority of their transactions.
pub fn senders(&self) -> impl Iterator<Item=&T::Sender> {
self.best_transactions.iter().map(|tx| tx.transaction.sender())
}
/// Returns an iterator of pending (ready) transactions.
@ -368,7 +429,7 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Returns pending (ready) transactions from given sender.
pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &Sender) -> PendingIterator<T, R, S, L> {
pub fn pending_from_sender<R: Ready<T>>(&self, ready: R, sender: &T::Sender) -> PendingIterator<T, R, S, L> {
let best_transactions = self.transactions.get(sender)
.and_then(|transactions| transactions.worst_and_best())
.map(|(_, best)| ScoreWithRef::new(best.0, best.1))
@ -387,7 +448,7 @@ impl<T, S, L> Pool<T, S, L> where
}
/// Update score of transactions of a particular sender.
pub fn update_scores(&mut self, sender: &Sender, event: S::Event) {
pub fn update_scores(&mut self, sender: &T::Sender, event: S::Event) {
let res = if let Some(set) = self.transactions.get_mut(sender) {
let prev = set.worst_and_best();
set.update_scores(&self.scoring, event);
@ -410,7 +471,7 @@ impl<T, S, L> Pool<T, S, L> where
let len = transactions.len();
for (idx, tx) in transactions.iter().enumerate() {
match ready.is_ready(tx) {
Readiness::Stalled => status.stalled += 1,
Readiness::Stale => status.stalled += 1,
Readiness::Ready => status.pending += 1,
Readiness::Future => {
status.future += len - idx;
@ -442,6 +503,11 @@ impl<T, S, L> Pool<T, S, L> where
&self.listener
}
/// Borrows scoring instance.
pub fn scoring(&self) -> &S {
&self.scoring
}
/// Borrows listener mutably.
pub fn listener_mut(&mut self) -> &mut L {
&mut self.listener
@ -485,7 +551,7 @@ impl<'a, T, R, S, L> Iterator for PendingIterator<'a, T, R, S, L> where
self.best_transactions.insert(ScoreWithRef::new(score, tx));
}
return Some(best.transaction)
return Some(best.transaction.transaction)
},
state => trace!("[{:?}] Ignoring {:?} transaction.", best.transaction.hash(), state),
}

View File

@ -17,8 +17,8 @@
/// Transaction readiness.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Readiness {
/// The transaction is stalled (and should/will be removed from the pool).
Stalled,
/// The transaction is stale (and should/will be removed from the pool).
Stale,
/// The transaction is ready to be included in pending set.
Ready,
/// The transaction is not yet ready.

View File

@ -17,9 +17,7 @@
//! A transactions ordering abstraction.
use std::{cmp, fmt};
use std::sync::Arc;
use {VerifiedTransaction};
use pool::Transaction;
/// Represents a decision what to do with
/// a new transaction that tries to enter the pool.
@ -98,10 +96,12 @@ pub trait Scoring<T>: fmt::Debug {
/// Updates the transaction scores given a list of transactions and a change to previous scoring.
/// NOTE: you can safely assume that both slices have the same length.
/// (i.e. score at index `i` represents transaction at the same index)
fn update_scores(&self, txs: &[Arc<T>], scores: &mut [Self::Score], change: Change<Self::Event>);
fn update_scores(&self, txs: &[Transaction<T>], scores: &mut [Self::Score], change: Change<Self::Event>);
/// Decides if `new` should push out `old` transaction from the pool.
fn should_replace(&self, old: &T, new: &T) -> bool;
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&self, old: &T, new: &T) -> Choice;
}
/// A score with a reference to the transaction.
@ -110,7 +110,14 @@ pub struct ScoreWithRef<T, S> {
/// Score
pub score: S,
/// Shared transaction
pub transaction: Arc<T>,
pub transaction: Transaction<T>,
}
impl<T, S> ScoreWithRef<T, S> {
/// Creates a new `ScoreWithRef`
pub fn new(score: S, transaction: Transaction<T>) -> Self {
ScoreWithRef { score, transaction }
}
}
impl<T, S: Clone> Clone for ScoreWithRef<T, S> {
@ -122,30 +129,23 @@ impl<T, S: Clone> Clone for ScoreWithRef<T, S> {
}
}
impl<T, S> ScoreWithRef<T, S> {
/// Creates a new `ScoreWithRef`
pub fn new(score: S, transaction: Arc<T>) -> Self {
ScoreWithRef { score, transaction }
}
}
impl<S: cmp::Ord, T: VerifiedTransaction> Ord for ScoreWithRef<T, S> {
impl<S: cmp::Ord, T> Ord for ScoreWithRef<T, S> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
other.score.cmp(&self.score)
.then(other.transaction.insertion_id().cmp(&self.transaction.insertion_id()))
.then(other.transaction.insertion_id.cmp(&self.transaction.insertion_id))
}
}
impl<S: cmp::Ord, T: VerifiedTransaction> PartialOrd for ScoreWithRef<T, S> {
impl<S: cmp::Ord, T> PartialOrd for ScoreWithRef<T, S> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<S: cmp::Ord, T: VerifiedTransaction> PartialEq for ScoreWithRef<T, S> {
impl<S: cmp::Ord, T> PartialEq for ScoreWithRef<T, S> {
fn eq(&self, other: &Self) -> bool {
self.score == other.score && self.transaction.insertion_id() == other.transaction.insertion_id()
self.score == other.score && self.transaction.insertion_id == other.transaction.insertion_id
}
}
impl<S: cmp::Ord, T: VerifiedTransaction> Eq for ScoreWithRef<T, S> {}
impl<S: cmp::Ord, T> Eq for ScoreWithRef<T, S> {}

View File

@ -17,12 +17,22 @@
use std::cmp;
use std::collections::HashMap;
use ethereum_types::U256;
use {scoring, Scoring, Ready, Readiness, Address as Sender};
use super::{Transaction, SharedTransaction};
use ethereum_types::{H160 as Sender, U256};
use {pool, scoring, Scoring, Ready, Readiness};
use super::Transaction;
#[derive(Debug, Default)]
pub struct DummyScoring;
pub struct DummyScoring {
always_insert: bool,
}
impl DummyScoring {
pub fn always_insert() -> Self {
DummyScoring {
always_insert: true,
}
}
}
impl Scoring<Transaction> for DummyScoring {
type Score = U256;
@ -44,7 +54,7 @@ impl Scoring<Transaction> for DummyScoring {
}
}
fn update_scores(&self, txs: &[SharedTransaction], scores: &mut [Self::Score], change: scoring::Change) {
fn update_scores(&self, txs: &[pool::Transaction<Transaction>], scores: &mut [Self::Score], change: scoring::Change) {
if let scoring::Change::Event(_) = change {
// In case of event reset all scores to 0
for i in 0..txs.len() {
@ -58,8 +68,14 @@ impl Scoring<Transaction> for DummyScoring {
}
}
fn should_replace(&self, old: &Transaction, new: &Transaction) -> bool {
new.gas_price > old.gas_price
fn should_replace(&self, old: &Transaction, new: &Transaction) -> scoring::Choice {
if self.always_insert {
scoring::Choice::InsertNew
} else if new.gas_price > old.gas_price {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
}
}
@ -84,7 +100,7 @@ impl Ready<Transaction> for NonceReady {
*nonce = *nonce + 1.into();
Readiness::Ready
},
cmp::Ordering::Less => Readiness::Stalled,
cmp::Ordering::Less => Readiness::Stale,
}
}
}

View File

@ -32,21 +32,31 @@ pub struct Transaction {
pub gas_price: U256,
pub gas: U256,
pub sender: Address,
pub insertion_id: u64,
pub mem_usage: usize,
}
impl VerifiedTransaction for Transaction {
type Hash = H256;
type Sender = Address;
fn hash(&self) -> &H256 { &self.hash }
fn mem_usage(&self) -> usize { self.mem_usage }
fn sender(&self) -> &Address { &self.sender }
fn insertion_id(&self) -> u64 { self.insertion_id }
}
pub type SharedTransaction = Arc<Transaction>;
type TestPool = Pool<Transaction, DummyScoring>;
impl TestPool {
pub fn with_limit(max_count: usize) -> Self {
Self::with_options(Options {
max_count,
..Default::default()
})
}
}
#[test]
fn should_clear_queue() {
// given
@ -123,7 +133,7 @@ fn should_reject_if_above_count() {
// Reject second
let tx1 = b.tx().nonce(0).new();
let tx2 = b.tx().nonce(1).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
@ -149,7 +159,7 @@ fn should_reject_if_above_mem_usage() {
// Reject second
let tx1 = b.tx().nonce(1).mem_usage(1).new();
let tx2 = b.tx().nonce(2).mem_usage(2).new();
let hash = *tx2.hash();
let hash = format!("{:?}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
@ -175,7 +185,7 @@ fn should_reject_if_above_sender_count() {
// Reject second
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).new();
let hash = *tx2.hash();
let hash = format!("{:x}", tx2.hash());
txq.import(tx1).unwrap();
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
assert_eq!(txq.light_status().transaction_count, 1);
@ -185,7 +195,7 @@ fn should_reject_if_above_sender_count() {
// Replace first
let tx1 = b.tx().nonce(1).new();
let tx2 = b.tx().nonce(2).gas_price(2).new();
let hash = *tx2.hash();
let hash = format!("{:x}", tx2.hash());
txq.import(tx1).unwrap();
// This results in error because we also compare nonces
assert_eq!(txq.import(tx2).unwrap_err().kind(), &error::ErrorKind::TooCheapToEnter(hash, "0x0".into()));
@ -444,9 +454,81 @@ fn should_return_worst_transaction() {
// when
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
txq.import(b.tx().sender(1).nonce(0).gas_price(4).new()).unwrap();
// then
assert!(txq.worst_transaction().is_some());
assert_eq!(txq.worst_transaction().unwrap().gas_price, 4.into());
}
#[test]
fn should_return_is_full() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_limit(2);
assert!(!txq.is_full());
// when
txq.import(b.tx().nonce(0).gas_price(110).new()).unwrap();
assert!(!txq.is_full());
txq.import(b.tx().sender(1).nonce(0).gas_price(100).new()).unwrap();
// then
assert!(txq.is_full());
}
#[test]
fn should_import_even_if_limit_is_reached_and_should_replace_returns_insert_new() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::always_insert(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap();
// then
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 2,
senders: 1,
mem_usage: 0,
});
}
#[test]
fn should_not_import_even_if_limit_is_reached_and_should_replace_returns_false() {
// given
let b = TransactionBuilder::default();
let mut txq = TestPool::with_scoring(DummyScoring::default(), Options {
max_count: 1,
..Default::default()
});
txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap();
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
// when
let err = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap_err();
// then
assert_eq!(err.kind(),
&error::ErrorKind::TooCheapToEnter("0x00000000000000000000000000000000000000000000000000000000000001f5".into(), "0x5".into()));
assert_eq!(txq.light_status(), LightStatus {
transaction_count: 1,
senders: 1,
mem_usage: 0,
});
}
mod listener {
@ -489,7 +571,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options {
let mut txq = Pool::new(listener, DummyScoring::default(), Options {
max_per_sender: 1,
max_count: 2,
..Default::default()
@ -527,7 +609,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
let tx1 = txq.import(b.tx().nonce(1).new()).unwrap();
@ -546,7 +628,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();
@ -564,7 +646,7 @@ mod listener {
let b = TransactionBuilder::default();
let listener = MyListener::default();
let results = listener.0.clone();
let mut txq = Pool::new(listener, DummyScoring, Options::default());
let mut txq = Pool::new(listener, DummyScoring::default(), Options::default());
// insert
txq.import(b.tx().nonce(1).new()).unwrap();

View File

@ -14,9 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::rc::Rc;
use std::cell::Cell;
use super::{Transaction, U256, Address};
#[derive(Debug, Default, Clone)]
@ -26,7 +23,6 @@ pub struct TransactionBuilder {
gas: U256,
sender: Address,
mem_usage: usize,
insertion_id: Rc<Cell<u64>>,
}
impl TransactionBuilder {
@ -55,11 +51,6 @@ impl TransactionBuilder {
}
pub fn new(self) -> Transaction {
let insertion_id = {
let id = self.insertion_id.get() + 1;
self.insertion_id.set(id);
id
};
let hash = self.nonce ^ (U256::from(100) * self.gas_price) ^ (U256::from(100_000) * U256::from(self.sender.low_u64()));
Transaction {
hash: hash.into(),
@ -67,7 +58,6 @@ impl TransactionBuilder {
gas_price: self.gas_price,
gas: 21_000.into(),
sender: self.sender,
insertion_id,
mem_usage: self.mem_usage,
}
}

View File

@ -15,28 +15,28 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{fmt, mem};
use std::sync::Arc;
use smallvec::SmallVec;
use ready::{Ready, Readiness};
use scoring::{self, Scoring};
use pool::Transaction;
#[derive(Debug)]
pub enum AddResult<T, S> {
Ok(Arc<T>),
Ok(T),
TooCheapToEnter(T, S),
TooCheap {
old: Arc<T>,
old: T,
new: T,
},
Replaced {
old: Arc<T>,
new: Arc<T>,
old: T,
new: T,
},
PushedOut {
old: Arc<T>,
new: Arc<T>,
old: T,
new: T,
},
}
@ -45,7 +45,7 @@ const PER_SENDER: usize = 8;
#[derive(Debug)]
pub struct Transactions<T, S: Scoring<T>> {
// TODO [ToDr] Consider using something that doesn't require shifting all records.
transactions: SmallVec<[Arc<T>; PER_SENDER]>,
transactions: SmallVec<[Transaction<T>; PER_SENDER]>,
scores: SmallVec<[S::Score; PER_SENDER]>,
}
@ -67,11 +67,11 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
self.transactions.len()
}
pub fn iter(&self) -> ::std::slice::Iter<Arc<T>> {
pub fn iter(&self) -> ::std::slice::Iter<Transaction<T>> {
self.transactions.iter()
}
pub fn worst_and_best(&self) -> Option<((S::Score, Arc<T>), (S::Score, Arc<T>))> {
pub fn worst_and_best(&self) -> Option<((S::Score, Transaction<T>), (S::Score, Transaction<T>))> {
let len = self.scores.len();
self.scores.get(0).cloned().map(|best| {
let worst = self.scores[len - 1].clone();
@ -82,7 +82,7 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
})
}
pub fn find_next(&self, tx: &T, scoring: &S) -> Option<(S::Score, Arc<T>)> {
pub fn find_next(&self, tx: &T, scoring: &S) -> Option<(S::Score, Transaction<T>)> {
self.transactions.binary_search_by(|old| scoring.compare(old, &tx)).ok().and_then(|index| {
let index = index + 1;
if index < self.scores.len() {
@ -93,18 +93,17 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
})
}
fn push_cheapest_transaction(&mut self, tx: T, scoring: &S, max_count: usize) -> AddResult<T, S::Score> {
fn push_cheapest_transaction(&mut self, tx: Transaction<T>, scoring: &S, max_count: usize) -> AddResult<Transaction<T>, S::Score> {
let index = self.transactions.len();
if index == max_count {
let min_score = self.scores[index - 1].clone();
AddResult::TooCheapToEnter(tx, min_score)
} else {
let shared = Arc::new(tx);
self.transactions.push(shared.clone());
self.transactions.push(tx.clone());
self.scores.push(Default::default());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index));
AddResult::Ok(shared)
AddResult::Ok(tx)
}
}
@ -112,28 +111,26 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::Event(event));
}
pub fn add(&mut self, tx: T, scoring: &S, max_count: usize) -> AddResult<T, S::Score> {
let index = match self.transactions.binary_search_by(|old| scoring.compare(old, &tx)) {
pub fn add(&mut self, new: Transaction<T>, scoring: &S, max_count: usize) -> AddResult<Transaction<T>, S::Score> {
let index = match self.transactions.binary_search_by(|old| scoring.compare(old, &new)) {
Ok(index) => index,
Err(index) => index,
};
// Insert at the end.
if index == self.transactions.len() {
return self.push_cheapest_transaction(tx, scoring, max_count)
return self.push_cheapest_transaction(new, scoring, max_count)
}
// Decide if the transaction should replace some other.
match scoring.choose(&self.transactions[index], &tx) {
match scoring.choose(&self.transactions[index], &new) {
// New transaction should be rejected
scoring::Choice::RejectNew => AddResult::TooCheap {
old: self.transactions[index].clone(),
new: tx,
new,
},
// New transaction should be kept along with old ones.
scoring::Choice::InsertNew => {
let new = Arc::new(tx);
self.transactions.insert(index, new.clone());
self.scores.insert(index, Default::default());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::InsertedAt(index));
@ -153,7 +150,6 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
},
// New transaction is replacing some other transaction already in the queue.
scoring::Choice::ReplaceOld => {
let new = Arc::new(tx);
let old = mem::replace(&mut self.transactions[index], new.clone());
scoring.update_scores(&self.transactions, &mut self.scores, scoring::Change::ReplacedAt(index));
@ -181,7 +177,7 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
return true;
}
pub fn cull<R: Ready<T>>(&mut self, ready: &mut R, scoring: &S) -> SmallVec<[Arc<T>; PER_SENDER]> {
pub fn cull<R: Ready<T>>(&mut self, ready: &mut R, scoring: &S) -> SmallVec<[Transaction<T>; PER_SENDER]> {
let mut result = SmallVec::new();
if self.is_empty() {
return result;
@ -190,7 +186,7 @@ impl<T: fmt::Debug, S: Scoring<T>> Transactions<T, S> {
let mut first_non_stalled = 0;
for tx in &self.transactions {
match ready.is_ready(tx) {
Readiness::Stalled => {
Readiness::Stale => {
first_non_stalled += 1;
},
Readiness::Ready | Readiness::Future => break,

View File

@ -80,10 +80,6 @@ pub enum Algorithm {
RefCounted,
}
impl Default for Algorithm {
fn default() -> Algorithm { Algorithm::OverlayRecent }
}
impl str::FromStr for Algorithm {
type Err = String;
@ -181,11 +177,6 @@ mod tests {
assert!(!Algorithm::RefCounted.is_stable());
}
#[test]
fn test_journal_algorithm_default() {
assert_eq!(Algorithm::default(), Algorithm::OverlayRecent);
}
#[test]
fn test_journal_algorithm_all_types() {
// compiling should fail if some cases are not covered

View File

@ -3,7 +3,7 @@
[package]
name = "parity-version"
# NOTE: this value is used for Parity version string (via env CARGO_PKG_VERSION)
version = "1.11.5"
version = "1.11.6"
authors = ["Parity Technologies <admin@parity.io>"]
build = "build.rs"