mixed merge and changes...

This commit is contained in:
keorn
2016-10-11 18:37:31 +01:00
parent 1f56588b87
commit e343153f06
58 changed files with 1397 additions and 621 deletions

View File

@@ -18,7 +18,7 @@ extern crate ethcore_ipc_codegen;
fn main() {
ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap();
ethcore_ipc_codegen::derive_ipc("src/client/traits.rs").unwrap();
ethcore_ipc_codegen::derive_ipc("src/snapshot/snapshot_service_trait.rs").unwrap();
ethcore_ipc_codegen::derive_ipc("src/client/chain_notify.rs").unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/client/traits.rs", cfg!(feature="ipc")).unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/snapshot/snapshot_service_trait.rs", cfg!(feature="ipc")).unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/client/chain_notify.rs", cfg!(feature="ipc")).unwrap();
}

View File

@@ -149,13 +149,6 @@ pub struct Client {
/// assume finality of a given candidate.
pub const HISTORY: u64 = 1200;
/// Append a path element to the given path and return the string.
pub fn append_path<P>(path: P, item: &str) -> String where P: AsRef<Path> {
let mut p = path.as_ref().to_path_buf();
p.push(item);
p.to_str().unwrap().to_owned()
}
impl Client {
/// Create a new client with given spec and DB path and custom verifier.
pub fn new(
@@ -169,7 +162,7 @@ impl Client {
let path = path.to_path_buf();
let gb = spec.genesis_block();
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().unwrap()).map_err(ClientError::Database)));
let db = Arc::new(try!(Database::open(&db_config, &path.to_str().expect("DB path could not be converted to string.")).map_err(ClientError::Database)));
let chain = Arc::new(BlockChain::new(config.blockchain.clone(), &gb, db.clone(), spec.engine.clone()));
let tracedb = RwLock::new(TraceDB::new(config.tracing.clone(), db.clone(), chain.clone()));
@@ -298,31 +291,27 @@ impl Client {
// Check if Parent is in chain
let chain_has_parent = chain.block_header(header.parent_hash());
if let None = chain_has_parent {
if let Some(parent) = chain_has_parent {
// Enact Verified Block
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let db = self.state_db.lock().boxed_clone_canon(&header.parent_hash());
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
let locked_block = try!(enact_result.map_err(|e| {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
}));
// Final Verification
if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
}
Ok(locked_block)
} else {
warn!(target: "client", "Block import failed for #{} ({}): Parent not found ({}) ", header.number(), header.hash(), header.parent_hash());
return Err(());
};
// Enact Verified Block
let parent = chain_has_parent.unwrap();
let last_hashes = self.build_last_hashes(header.parent_hash().clone());
let is_canon = header.parent_hash() == &chain.best_block_hash();
let db = if is_canon { self.state_db.lock().boxed_clone_canon() } else { self.state_db.lock().boxed_clone() };
let enact_result = enact_verified(block, engine, self.tracedb.read().tracing_enabled(), db, &parent, last_hashes, self.factories.clone());
if let Err(e) = enact_result {
warn!(target: "client", "Block import failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
};
// Final Verification
let locked_block = enact_result.unwrap();
if let Err(e) = self.verifier.verify_block_final(header, locked_block.block().header()) {
warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e);
return Err(());
Err(())
}
Ok(locked_block)
}
fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec<H256>, Vec<H256>) {
@@ -366,23 +355,21 @@ impl Client {
for block in blocks {
let header = &block.header;
if invalid_blocks.contains(header.parent_hash()) {
let is_invalid = invalid_blocks.contains(header.parent_hash());
if is_invalid {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
if let Ok(closed_block) = self.check_and_close_block(&block) {
imported_blocks.push(header.hash());
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
self.report.write().accrue_block(&block);
} else {
invalid_blocks.insert(header.hash());
continue;
}
let closed_block = closed_block.unwrap();
imported_blocks.push(header.hash());
let route = self.commit_block(closed_block, &header.hash(), &block.bytes);
import_results.push(route);
self.report.write().accrue_block(&block);
}
let imported = imported_blocks.len();
@@ -432,7 +419,7 @@ impl Client {
// Are we committing an era?
let ancient = if number >= HISTORY {
let n = number - HISTORY;
Some((n, chain.block_hash(n).unwrap()))
Some((n, chain.block_hash(n).expect("only verified blocks can be commited; verified block has hash; qed")))
} else {
None
};
@@ -461,6 +448,8 @@ impl Client {
enacted: route.enacted.clone(),
retracted: route.retracted.len()
});
let is_canon = route.enacted.last().map_or(false, |h| h == hash);
state.sync_cache(&route.enacted, &route.retracted, is_canon);
// Final commit to the DB
self.db.read().write_buffered(batch);
chain.commit();
@@ -535,9 +524,11 @@ impl Client {
/// Get a copy of the best block's state.
pub fn state(&self) -> State {
let header = self.best_block_header();
let header = HeaderView::new(&header);
State::from_existing(
self.state_db.lock().boxed_clone(),
HeaderView::new(&self.best_block_header()).state_root(),
self.state_db.lock().boxed_clone_canon(&header.hash()),
header.state_root(),
self.engine.account_start_nonce(),
self.factories.clone())
.expect("State root of best block header always valid.")
@@ -899,8 +890,10 @@ impl BlockChainClient for Client {
BodyView::new(&block).localized_transaction_at(&address.block_hash, block_number, address.index)
});
match (t, chain.transaction_receipt(&address)) {
(Some(tx), Some(receipt)) => {
let tx_and_sender = t.and_then(|tx| tx.sender().ok().map(|sender| (tx, sender)));
match (tx_and_sender, chain.transaction_receipt(&address)) {
(Some((tx, sender)), Some(receipt)) => {
let block_hash = tx.block_hash.clone();
let block_number = tx.block_number.clone();
let transaction_hash = tx.hash();
@@ -922,7 +915,7 @@ impl BlockChainClient for Client {
gas_used: receipt.gas_used - prior_gas_used,
contract_address: match tx.action {
Action::Call(_) => None,
Action::Create => Some(contract_address(&tx.sender().unwrap(), &tx.nonce))
Action::Create => Some(contract_address(&sender, &tx.nonce))
},
logs: receipt.logs.into_iter().enumerate().map(|(i, log)| LocalizedLogEntry {
entry: log,
@@ -1023,17 +1016,18 @@ impl BlockChainClient for Client {
let start = self.block_number(filter.range.start);
let end = self.block_number(filter.range.end);
if start.is_some() && end.is_some() {
let filter = trace::Filter {
range: start.unwrap() as usize..end.unwrap() as usize,
from_address: From::from(filter.from_address),
to_address: From::from(filter.to_address),
};
match (start, end) {
(Some(s), Some(e)) => {
let filter = trace::Filter {
range: s as usize..e as usize,
from_address: From::from(filter.from_address),
to_address: From::from(filter.to_address),
};
let traces = self.tracedb.read().filter(&filter);
Some(traces)
} else {
None
let traces = self.tracedb.read().filter(&filter);
Some(traces)
},
_ => None,
}
}
@@ -1080,7 +1074,7 @@ impl BlockChainClient for Client {
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.miner.pending_transactions()
self.miner.pending_transactions(self.chain.read().best_block_number())
}
// TODO: Make it an actual queue, return errors.
@@ -1109,7 +1103,7 @@ impl MiningBlockChainClient for Client {
engine,
self.factories.clone(),
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.lock().boxed_clone(),
self.state_db.lock().boxed_clone_canon(&h),
&chain.block_header(&h).expect("h is best block hash: so its header must exist: qed"),
self.build_last_hashes(h.clone()),
author,
@@ -1120,11 +1114,15 @@ impl MiningBlockChainClient for Client {
// Add uncles
chain
.find_uncle_headers(&h, engine.maximum_uncle_age())
.unwrap()
.unwrap_or_else(Vec::new)
.into_iter()
.take(engine.maximum_uncle_count())
.foreach(|h| {
open_block.push_uncle(h).unwrap();
open_block.push_uncle(h).expect("pushing maximum_uncle_count;
open_block was just created;
push_uncle is not ok only if more than maximum_uncle_count is pushed;
so all push_uncle are Ok;
qed");
});
open_block
@@ -1145,6 +1143,7 @@ impl MiningBlockChainClient for Client {
let block_data = block.rlp_bytes();
let route = self.commit_block(block, &h, &block_data);
trace!(target: "client", "Imported sealed block #{} ({})", number, h);
self.state_db.lock().sync_cache(&route.enacted, &route.retracted, false);
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);

View File

@@ -30,13 +30,20 @@ pub use self::test_client::{TestBlockChainClient, EachBlockWith};
pub use types::trace_filter::Filter as TraceFilter;
pub use executive::{Executed, Executive, TransactOptions};
pub use env_info::{LastHashes, EnvInfo};
pub use self::chain_notify::{ChainNotify, ChainNotifyClient};
pub use self::chain_notify::ChainNotify;
pub use types::call_analytics::CallAnalytics;
pub use block_import_error::BlockImportError;
pub use transaction_import::TransactionImportResult;
pub use transaction_import::TransactionImportError;
pub use self::traits::{BlockChainClient, MiningBlockChainClient, RemoteClient};
pub use self::traits::{BlockChainClient, MiningBlockChainClient};
/// IPC interfaces
#[cfg(feature="ipc")]
pub mod remote {
pub use super::traits::RemoteClient;
pub use super::chain_notify::ChainNotifyClient;
}
mod traits {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues

View File

@@ -55,6 +55,8 @@ pub struct TestBlockChainClient {
pub genesis_hash: H256,
/// Last block hash.
pub last_hash: RwLock<H256>,
/// Extra data do set for each block
pub extra_data: Bytes,
/// Difficulty.
pub difficulty: RwLock<U256>,
/// Balances.
@@ -105,11 +107,17 @@ impl Default for TestBlockChainClient {
impl TestBlockChainClient {
/// Creates new test client.
pub fn new() -> Self {
Self::new_with_extra_data(Bytes::new())
}
/// Creates new test client with specified extra data for each block
pub fn new_with_extra_data(extra_data: Bytes) -> Self {
let spec = Spec::new_test();
let mut client = TestBlockChainClient {
blocks: RwLock::new(HashMap::new()),
numbers: RwLock::new(HashMap::new()),
genesis_hash: H256::new(),
extra_data: extra_data,
last_hash: RwLock::new(H256::new()),
difficulty: RwLock::new(From::from(0)),
balances: RwLock::new(HashMap::new()),
@@ -129,7 +137,7 @@ impl TestBlockChainClient {
client.genesis_hash = client.last_hash.read().clone();
client
}
/// Set the transaction receipt result
pub fn set_transaction_receipt(&self, id: TransactionID, receipt: LocalizedReceipt) {
self.receipts.write().insert(id, receipt);
@@ -184,6 +192,7 @@ impl TestBlockChainClient {
header.set_parent_hash(self.last_hash.read().clone());
header.set_number(n as BlockNumber);
header.set_gas_limit(U256::from(1_000_000));
header.set_extra_data(self.extra_data.clone());
let uncles = match with {
EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => {
let mut uncles = RlpStream::new_list(1);
@@ -606,6 +615,6 @@ impl BlockChainClient for TestBlockChainClient {
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
self.miner.pending_transactions()
self.miner.pending_transactions(self.chain_info().best_block_number)
}
}

View File

@@ -20,6 +20,7 @@ mod message;
mod timeout;
mod params;
mod vote;
mod vote_collector;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use common::*;
@@ -246,9 +247,11 @@ impl Engine for Tendermint {
}
}
/// Set author to proposer.
/// Set author to proposer and set the correct round in the seal.
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, _block: &mut ExecutedBlock) {}
fn on_close_block(&self, _block: &mut ExecutedBlock) {
}
/// Attempt to seal the block internally using all available signatures.
///
@@ -278,11 +281,14 @@ impl Engine for Tendermint {
fn handle_message(&self, sender: Address, signature: H520, message: UntrustedRlp) -> Result<Bytes, Error> {
let message: ConsensusMessage = try!(message.as_val());
try!(Err(EngineError::UnknownStep))
//match message {
// ConsensusMessage::Prevote
//}
if self.is_authority(&sender) {
//match message {
// ConsensusMessage::Prevote
//}
}
try!(Err(EngineError::UnknownStep))
// Check if correct round.
//if self.r.load(AtomicOrdering::Relaxed) != try!(message.val_at(0)) {

View File

@@ -21,27 +21,35 @@ use util::Hashable;
use account_provider::AccountProvider;
use rlp::{View, DecoderError, Decodable, Decoder, Encodable, RlpStream, Stream};
use basic_types::Seal;
use super::BlockHash;
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Vote {
block_hash: BlockHash,
signature: H520
}
fn message(header: &Header) -> H256 {
fn block_hash(header: &Header) -> H256 {
header.rlp(Seal::WithSome(1)).sha3()
}
impl Vote {
fn new(signature: H520) -> Vote { Vote { signature: signature }}
fn new(block_hash: BlockHash, signature: H520) -> Vote {
Vote { block_hash: block_hash, signature: signature }
}
/// Try to use the author address to create a vote.
pub fn propose(header: &Header, accounts: &AccountProvider) -> Option<Vote> {
accounts.sign(*header.author(), message(&header)).ok().map(Into::into).map(Self::new)
Self::validate(header, accounts, *header.author())
}
/// Use any unlocked validator account to create a vote.
pub fn validate(header: &Header, accounts: &AccountProvider, validator: Address) -> Option<Vote> {
accounts.sign(validator, message(&header)).ok().map(Into::into).map(Self::new)
let message = block_hash(&header);
accounts.sign(validator, message)
.ok()
.map(Into::into)
.map(|sig| Self::new(message, sig))
}
}
@@ -51,13 +59,14 @@ impl Decodable for Vote {
if decoder.as_raw().len() != try!(rlp.payload_info()).total() {
return Err(DecoderError::RlpIsTooBig);
}
rlp.as_val().map(Self::new)
Ok(Self::new(try!(rlp.val_at(0)), try!(rlp.val_at(1))))
}
}
impl Encodable for Vote {
fn rlp_append(&self, s: &mut RlpStream) {
let Vote { ref signature } = *self;
let Vote { ref block_hash, ref signature } = *self;
s.append(block_hash);
s.append(signature);
}
}

View File

@@ -116,11 +116,11 @@ impl<Cost: CostType> evm::Evm for Interpreter<Cost> {
let instruction = code[reader.position];
reader.position += 1;
let info = infos[instruction as usize];
try!(self.verify_instruction(ext, instruction, &info, &stack));
let info = &infos[instruction as usize];
try!(self.verify_instruction(ext, instruction, info, &stack));
// Calculate gas cost
let (gas_cost, mem_gas, mem_size) = try!(gasometer.get_gas_cost_mem(ext, instruction, &info, &stack, self.mem.size()));
let (gas_cost, mem_gas, mem_size) = try!(gasometer.get_gas_cost_mem(ext, instruction, info, &stack, self.mem.size()));
// TODO: make compile-time removable if too much of a performance hit.
let trace_executed = ext.trace_prepare_execute(reader.position - 1, instruction, &gas_cost.as_u256());
@@ -129,7 +129,7 @@ impl<Cost: CostType> evm::Evm for Interpreter<Cost> {
gasometer.current_mem_gas = mem_gas;
gasometer.current_gas = gasometer.current_gas - gas_cost;
evm_debug!({ informant.before_instruction(reader.position, instruction, &info, &gasometer.current_gas, &stack) });
evm_debug!({ informant.before_instruction(reader.position, instruction, info, &gasometer.current_gas, &stack) });
let (mem_written, store_written) = match trace_executed {
true => (Self::mem_written(instruction, &stack), Self::store_written(instruction, &stack)),

View File

@@ -21,7 +21,7 @@ use util::sha3::*;
use bit_set::BitSet;
use super::super::instructions;
const CACHE_CODE_ITEMS: usize = 4096;
const CACHE_CODE_ITEMS: usize = 65536;
/// GLobal cache for EVM interpreter
pub struct SharedCache {

View File

@@ -25,10 +25,10 @@ use trace::{FlatTrace, Tracer, NoopTracer, ExecutiveTracer, VMTrace, VMTracer, E
use crossbeam;
pub use types::executed::{Executed, ExecutionResult};
/// Max depth to avoid stack overflow (when it's reached we start a new thread with VM)
/// Roughly estimate what stack size each level of evm depth will use
/// TODO [todr] We probably need some more sophisticated calculations here (limit on my machine 132)
/// Maybe something like here: `https://github.com/ethereum/libethereum/blob/4db169b8504f2b87f7d5a481819cfb959fc65f6c/libethereum/ExtVM.cpp`
const MAX_VM_DEPTH_FOR_THREAD: usize = 64;
const STACK_SIZE_PER_DEPTH: usize = 24*1024;
/// Returns new address created from address and given nonce.
pub fn contract_address(address: &Address, nonce: &U256) -> Address {
@@ -149,12 +149,13 @@ impl<'a> Executive<'a> {
// TODO: we might need bigints here, or at least check overflows.
let balance = self.state.balance(&sender);
let gas_cost = U512::from(t.gas) * U512::from(t.gas_price);
let gas_cost = t.gas.full_mul(t.gas_price);
let total_cost = U512::from(t.value) + gas_cost;
// avoid unaffordable transactions
if U512::from(balance) < total_cost {
return Err(From::from(ExecutionError::NotEnoughCash { required: total_cost, got: U512::from(balance) }));
let balance512 = U512::from(balance);
if balance512 < total_cost {
return Err(From::from(ExecutionError::NotEnoughCash { required: total_cost, got: balance512 }));
}
// NOTE: there can be no invalid transactions from this point.
@@ -212,8 +213,11 @@ impl<'a> Executive<'a> {
tracer: &mut T,
vm_tracer: &mut V
) -> evm::Result<U256> where T: Tracer, V: VMTracer {
let depth_threshold = ::io::LOCAL_STACK_SIZE.with(|sz| sz.get() / STACK_SIZE_PER_DEPTH);
// Ordinary execution - keep VM in same thread
if (self.depth + 1) % MAX_VM_DEPTH_FOR_THREAD != 0 {
if (self.depth + 1) % depth_threshold != 0 {
let vm_factory = self.vm_factory;
let mut ext = self.as_externalities(OriginInfo::from(&params), unconfirmed_substate, output_policy, tracer, vm_tracer);
trace!(target: "executive", "ext.schedule.have_delegate_call: {}", ext.schedule().have_delegate_call);
@@ -265,7 +269,7 @@ impl<'a> Executive<'a> {
let cost = self.engine.cost_of_builtin(&params.code_address, data);
if cost <= params.gas {
self.engine.execute_builtin(&params.code_address, data, &mut output);
self.state.clear_snapshot();
self.state.discard_snapshot();
// trace only top level calls to builtins to avoid DDoS attacks
if self.depth == 0 {
@@ -285,7 +289,7 @@ impl<'a> Executive<'a> {
Ok(params.gas - cost)
} else {
// just drain the whole gas
self.state.revert_snapshot();
self.state.revert_to_snapshot();
tracer.trace_failed_call(trace_info, vec![], evm::Error::OutOfGas.into());
@@ -331,7 +335,7 @@ impl<'a> Executive<'a> {
res
} else {
// otherwise it's just a basic transaction, only do tracing, if necessary.
self.state.clear_snapshot();
self.state.discard_snapshot();
tracer.trace_call(trace_info, U256::zero(), trace_output, vec![]);
Ok(params.gas)
@@ -413,7 +417,7 @@ impl<'a> Executive<'a> {
// real ammount to refund
let gas_left_prerefund = match result { Ok(x) => x, _ => 0.into() };
let refunded = cmp::min(refunds_bound, (t.gas - gas_left_prerefund) / U256::from(2));
let refunded = cmp::min(refunds_bound, (t.gas - gas_left_prerefund) >> 1);
let gas_left = gas_left_prerefund + refunded;
let gas_used = t.gas - gas_left;
@@ -473,10 +477,10 @@ impl<'a> Executive<'a> {
| Err(evm::Error::BadInstruction {.. })
| Err(evm::Error::StackUnderflow {..})
| Err(evm::Error::OutOfStack {..}) => {
self.state.revert_snapshot();
self.state.revert_to_snapshot();
},
Ok(_) | Err(evm::Error::Internal) => {
self.state.clear_snapshot();
self.state.discard_snapshot();
substate.accrue(un_substate);
}
}

View File

@@ -48,6 +48,17 @@ pub enum PendingSet {
SealingOrElseQueue,
}
/// Type of the gas limit to apply to the transaction queue.
#[derive(Debug, PartialEq)]
pub enum GasLimit {
/// Depends on the block gas limit and is updated with every block.
Auto,
/// No limit.
None,
/// Set to a fixed gas value.
Fixed(U256),
}
/// Configures the behaviour of the miner.
#[derive(Debug, PartialEq)]
pub struct MinerOptions {
@@ -71,6 +82,8 @@ pub struct MinerOptions {
pub work_queue_size: usize,
/// Can we submit two different solutions for the same block and expect both to result in an import?
pub enable_resubmission: bool,
/// Global gas limit for all transaction in the queue except for local and retracted.
pub tx_queue_gas_limit: GasLimit,
}
impl Default for MinerOptions {
@@ -81,11 +94,12 @@ impl Default for MinerOptions {
reseal_on_external_tx: false,
reseal_on_own_tx: true,
tx_gas_limit: !U256::zero(),
tx_queue_size: 1024,
tx_queue_size: 2048,
pending_set: PendingSet::AlwaysQueue,
reseal_min_period: Duration::from_secs(2),
work_queue_size: 20,
enable_resubmission: true,
tx_queue_gas_limit: GasLimit::Auto,
}
}
}
@@ -194,7 +208,11 @@ impl Miner {
true => None,
false => Some(WorkPoster::new(&options.new_work_notify))
};
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)));
let gas_limit = match options.tx_queue_gas_limit {
GasLimit::Fixed(ref limit) => *limit,
_ => !U256::zero(),
};
let txq = Arc::new(Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, gas_limit, options.tx_gas_limit)));
Miner {
transaction_queue: txq,
next_allowed_reseal: Mutex::new(Instant::now()),
@@ -443,6 +461,10 @@ impl Miner {
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit();
let mut queue = self.transaction_queue.lock();
queue.set_gas_limit(gas_limit);
if let GasLimit::Auto = self.options.tx_queue_gas_limit {
// Set total tx queue gas limit to be 2x the block gas limit.
queue.set_total_gas_limit(gas_limit << 1);
}
}
/// Returns true if we had to prepare new pending block.
@@ -493,6 +515,21 @@ impl Miner {
/// Are we allowed to do a non-mandatory reseal?
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock() }
fn from_pending_block<H, F, G>(&self, latest_block_number: BlockNumber, from_chain: F, map_block: G) -> H
where F: Fn() -> H, G: Fn(&ClosedBlock) -> H {
let sealing_work = self.sealing_work.lock();
sealing_work.queue.peek_last_ref().map_or_else(
|| from_chain(),
|b| {
if b.block().header().number() > latest_block_number {
map_block(b)
} else {
from_chain()
}
}
)
}
}
const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
@@ -565,29 +602,35 @@ impl MinerService for Miner {
}
fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock();
sealing_work.queue.peek_last_ref().map_or_else(
self.from_pending_block(
chain.chain_info().best_block_number,
|| chain.latest_balance(address),
|b| b.block().fields().state.balance(address)
)
}
fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 {
let sealing_work = self.sealing_work.lock();
sealing_work.queue.peek_last_ref().map_or_else(
self.from_pending_block(
chain.chain_info().best_block_number,
|| chain.latest_storage_at(address, position),
|b| b.block().fields().state.storage_at(address, position)
)
}
fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock();
sealing_work.queue.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
self.from_pending_block(
chain.chain_info().best_block_number,
|| chain.latest_nonce(address),
|b| b.block().fields().state.nonce(address)
)
}
fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes> {
let sealing_work = self.sealing_work.lock();
sealing_work.queue.peek_last_ref().map_or_else(|| chain.latest_code(address), |b| b.block().fields().state.code(address).map(|c| (*c).clone()))
self.from_pending_block(
chain.chain_info().best_block_number,
|| chain.latest_code(address),
|b| b.block().fields().state.code(address).map(|c| (*c).clone())
)
}
fn set_author(&self, author: Address) {
@@ -737,50 +780,74 @@ impl MinerService for Miner {
queue.top_transactions()
}
fn pending_transactions(&self) -> Vec<SignedTransaction> {
fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction> {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
// TODO: should only use the sealing_work when it's current (it could be an old block)
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
(&PendingSet::AlwaysQueue, _) | (&PendingSet::SealingOrElseQueue, None) => queue.top_transactions(),
(_, sealing) => sealing.map_or_else(Vec::new, |s| s.transactions().to_owned()),
match self.options.pending_set {
PendingSet::AlwaysQueue => queue.top_transactions(),
PendingSet::SealingOrElseQueue => {
self.from_pending_block(
best_block,
|| queue.top_transactions(),
|sealing| sealing.transactions().to_owned()
)
},
PendingSet::AlwaysSealing => {
self.from_pending_block(
best_block,
|| vec![],
|sealing| sealing.transactions().to_owned()
)
},
}
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
fn pending_transactions_hashes(&self, best_block: BlockNumber) -> Vec<H256> {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
(&PendingSet::AlwaysQueue, _) | (&PendingSet::SealingOrElseQueue, None) => queue.pending_hashes(),
(_, sealing) => sealing.map_or_else(Vec::new, |s| s.transactions().iter().map(|t| t.hash()).collect()),
match self.options.pending_set {
PendingSet::AlwaysQueue => queue.pending_hashes(),
PendingSet::SealingOrElseQueue => {
self.from_pending_block(
best_block,
|| queue.pending_hashes(),
|sealing| sealing.transactions().iter().map(|t| t.hash()).collect()
)
},
PendingSet::AlwaysSealing => {
self.from_pending_block(
best_block,
|| vec![],
|sealing| sealing.transactions().iter().map(|t| t.hash()).collect()
)
},
}
}
fn transaction(&self, hash: &H256) -> Option<SignedTransaction> {
fn transaction(&self, best_block: BlockNumber, hash: &H256) -> Option<SignedTransaction> {
let queue = self.transaction_queue.lock();
let sw = self.sealing_work.lock();
let sealing_set = match sw.enabled {
true => sw.queue.peek_last_ref(),
false => None,
};
match (&self.options.pending_set, sealing_set) {
(&PendingSet::AlwaysQueue, _) | (&PendingSet::SealingOrElseQueue, None) => queue.find(hash),
(_, sealing) => sealing.and_then(|s| s.transactions().iter().find(|t| &t.hash() == hash).cloned()),
match self.options.pending_set {
PendingSet::AlwaysQueue => queue.find(hash),
PendingSet::SealingOrElseQueue => {
self.from_pending_block(
best_block,
|| queue.find(hash),
|sealing| sealing.transactions().iter().find(|t| &t.hash() == hash).cloned()
)
},
PendingSet::AlwaysSealing => {
self.from_pending_block(
best_block,
|| None,
|sealing| sealing.transactions().iter().find(|t| &t.hash() == hash).cloned()
)
},
}
}
fn pending_receipt(&self, hash: &H256) -> Option<RichReceipt> {
let sealing_work = self.sealing_work.lock();
match (sealing_work.enabled, sealing_work.queue.peek_last_ref()) {
(true, Some(pending)) => {
fn pending_receipt(&self, best_block: BlockNumber, hash: &H256) -> Option<RichReceipt> {
self.from_pending_block(
best_block,
|| None,
|pending| {
let txs = pending.transactions();
txs.iter()
.map(|t| t.hash())
@@ -801,15 +868,15 @@ impl MinerService for Miner {
logs: receipt.logs.clone(),
}
})
},
_ => None
}
}
)
}
fn pending_receipts(&self) -> BTreeMap<H256, Receipt> {
let sealing_work = self.sealing_work.lock();
match (sealing_work.enabled, sealing_work.queue.peek_last_ref()) {
(true, Some(pending)) => {
fn pending_receipts(&self, best_block: BlockNumber) -> BTreeMap<H256, Receipt> {
self.from_pending_block(
best_block,
|| BTreeMap::new(),
|pending| {
let hashes = pending.transactions()
.iter()
.map(|t| t.hash());
@@ -817,9 +884,8 @@ impl MinerService for Miner {
let receipts = pending.receipts().iter().cloned();
hashes.zip(receipts).collect()
},
_ => BTreeMap::new()
}
}
)
}
fn last_nonce(&self, address: &Address) -> Option<U256> {
@@ -1016,6 +1082,7 @@ mod tests {
reseal_min_period: Duration::from_secs(5),
tx_gas_limit: !U256::zero(),
tx_queue_size: 1024,
tx_queue_gas_limit: GasLimit::None,
pending_set: PendingSet::AlwaysSealing,
work_queue_size: 5,
enable_resubmission: true,
@@ -1044,34 +1111,54 @@ mod tests {
let client = TestBlockChainClient::default();
let miner = miner();
let transaction = transaction();
let best_block = 0;
// when
let res = miner.import_own_transaction(&client, transaction);
// then
assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(miner.all_transactions().len(), 1);
assert_eq!(miner.pending_transactions().len(), 1);
assert_eq!(miner.pending_transactions_hashes().len(), 1);
assert_eq!(miner.pending_receipts().len(), 1);
assert_eq!(miner.pending_transactions(best_block).len(), 1);
assert_eq!(miner.pending_transactions_hashes(best_block).len(), 1);
assert_eq!(miner.pending_receipts(best_block).len(), 1);
// This method will let us know if pending block was created (before calling that method)
assert!(!miner.prepare_work_sealing(&client));
}
#[test]
fn should_not_use_pending_block_if_best_block_is_higher() {
// given
let client = TestBlockChainClient::default();
let miner = miner();
let transaction = transaction();
let best_block = 10;
// when
let res = miner.import_own_transaction(&client, transaction);
// then
assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(miner.all_transactions().len(), 1);
assert_eq!(miner.pending_transactions(best_block).len(), 0);
assert_eq!(miner.pending_transactions_hashes(best_block).len(), 0);
assert_eq!(miner.pending_receipts(best_block).len(), 0);
}
#[test]
fn should_import_external_transaction() {
// given
let client = TestBlockChainClient::default();
let miner = miner();
let transaction = transaction();
let best_block = 0;
// when
let res = miner.import_external_transactions(&client, vec![transaction]).pop().unwrap();
// then
assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(miner.all_transactions().len(), 1);
assert_eq!(miner.pending_transactions_hashes().len(), 0);
assert_eq!(miner.pending_transactions().len(), 0);
assert_eq!(miner.pending_receipts().len(), 0);
assert_eq!(miner.pending_transactions_hashes(best_block).len(), 0);
assert_eq!(miner.pending_transactions(best_block).len(), 0);
assert_eq!(miner.pending_receipts(best_block).len(), 0);
// This method will let us know if pending block was created (before calling that method)
assert!(miner.prepare_work_sealing(&client));
}

View File

@@ -48,7 +48,7 @@ mod work_notify;
mod price_info;
pub use self::transaction_queue::{TransactionQueue, AccountDetails, TransactionOrigin};
pub use self::miner::{Miner, MinerOptions, PendingSet, GasPricer, GasPriceCalibratorOptions};
pub use self::miner::{Miner, MinerOptions, PendingSet, GasPricer, GasPriceCalibratorOptions, GasLimit};
pub use self::external::{ExternalMiner, ExternalMinerService};
pub use client::TransactionImportResult;
@@ -56,6 +56,7 @@ use std::collections::BTreeMap;
use util::{H256, U256, Address, Bytes};
use client::{MiningBlockChainClient, Executed, CallAnalytics};
use block::ClosedBlock;
use header::BlockNumber;
use receipt::{RichReceipt, Receipt};
use error::{Error, CallError};
use transaction::SignedTransaction;
@@ -115,7 +116,7 @@ pub trait MinerService : Send + Sync {
Result<TransactionImportResult, Error>;
/// Returns hashes of transactions currently in pending
fn pending_transactions_hashes(&self) -> Vec<H256>;
fn pending_transactions_hashes(&self, best_block: BlockNumber) -> Vec<H256>;
/// Removes all transactions from the queue and restart mining operation.
fn clear_and_reset(&self, chain: &MiningBlockChainClient);
@@ -135,19 +136,19 @@ pub trait MinerService : Send + Sync {
where F: FnOnce(&ClosedBlock) -> T, Self: Sized;
/// Query pending transactions for hash.
fn transaction(&self, hash: &H256) -> Option<SignedTransaction>;
fn transaction(&self, best_block: BlockNumber, hash: &H256) -> Option<SignedTransaction>;
/// Get a list of all transactions.
fn all_transactions(&self) -> Vec<SignedTransaction>;
/// Get a list of all pending transactions.
fn pending_transactions(&self) -> Vec<SignedTransaction>;
fn pending_transactions(&self, best_block: BlockNumber) -> Vec<SignedTransaction>;
/// Get a list of all pending receipts.
fn pending_receipts(&self) -> BTreeMap<H256, Receipt>;
fn pending_receipts(&self, best_block: BlockNumber) -> BTreeMap<H256, Receipt>;
/// Get a particular reciept.
fn pending_receipt(&self, hash: &H256) -> Option<RichReceipt>;
fn pending_receipt(&self, best_block: BlockNumber, hash: &H256) -> Option<RichReceipt>;
/// Returns highest transaction nonce for given address.
fn last_nonce(&self, address: &Address) -> Option<U256>;

View File

@@ -130,6 +130,8 @@ struct TransactionOrder {
/// (e.g. Tx(nonce:5), State(nonce:0) -> height: 5)
/// High nonce_height = Low priority (processed later)
nonce_height: U256,
/// Gas specified in the transaction.
gas: U256,
/// Gas Price of the transaction.
/// Low gas price = Low priority (processed later)
gas_price: U256,
@@ -146,6 +148,7 @@ impl TransactionOrder {
fn for_transaction(tx: &VerifiedTransaction, base_nonce: U256) -> Self {
TransactionOrder {
nonce_height: tx.nonce() - base_nonce,
gas: tx.transaction.gas.clone(),
gas_price: tx.transaction.gas_price,
hash: tx.hash(),
origin: tx.origin,
@@ -287,6 +290,7 @@ struct TransactionSet {
by_address: Table<Address, U256, TransactionOrder>,
by_gas_price: GasPriceQueue,
limit: usize,
gas_limit: U256,
}
impl TransactionSet {
@@ -317,15 +321,20 @@ impl TransactionSet {
/// It drops transactions from this set but also removes associated `VerifiedTransaction`.
/// Returns addresses and lowest nonces of transactions removed because of limit.
fn enforce_limit(&mut self, by_hash: &mut HashMap<H256, VerifiedTransaction>) -> Option<HashMap<Address, U256>> {
let len = self.by_priority.len();
if len <= self.limit {
return None;
}
let mut count = 0;
let mut gas: U256 = 0.into();
let to_drop : Vec<(Address, U256)> = {
self.by_priority
.iter()
.skip(self.limit)
.skip_while(|order| {
count = count + 1;
let r = gas.overflowing_add(order.gas);
if r.1 { return false }
gas = r.0;
// Own and retracted transactions are allowed to go above the gas limit, bot not above the count limit.
(gas <= self.gas_limit || order.origin == TransactionOrigin::Local || order.origin == TransactionOrigin::RetractedBlock) &&
count <= self.limit
})
.map(|order| by_hash.get(&order.hash)
.expect("All transactions in `self.by_priority` and `self.by_address` are kept in sync with `by_hash`."))
.map(|tx| (tx.sender(), tx.nonce()))
@@ -432,16 +441,17 @@ impl Default for TransactionQueue {
impl TransactionQueue {
/// Creates new instance of this Queue
pub fn new() -> Self {
Self::with_limits(1024, !U256::zero())
Self::with_limits(1024, !U256::zero(), !U256::zero())
}
/// Create new instance of this Queue with specified limits
pub fn with_limits(limit: usize, tx_gas_limit: U256) -> Self {
pub fn with_limits(limit: usize, gas_limit: U256, tx_gas_limit: U256) -> Self {
let current = TransactionSet {
by_priority: BTreeSet::new(),
by_address: Table::new(),
by_gas_price: Default::default(),
limit: limit,
gas_limit: gas_limit,
};
let future = TransactionSet {
@@ -449,6 +459,7 @@ impl TransactionQueue {
by_address: Table::new(),
by_gas_price: Default::default(),
limit: limit,
gas_limit: gas_limit,
};
TransactionQueue {
@@ -504,6 +515,13 @@ impl TransactionQueue {
};
}
/// Sets new total gas limit.
pub fn set_total_gas_limit(&mut self, gas_limit: U256) {
self.future.gas_limit = gas_limit;
self.current.gas_limit = gas_limit;
self.future.enforce_limit(&mut self.by_hash);
}
/// Set the new limit for the amount of gas any individual transaction may have.
/// Any transaction already imported to the queue is not affected.
pub fn set_tx_gas_limit(&mut self, limit: U256) {
@@ -636,7 +654,7 @@ impl TransactionQueue {
};
for k in nonces_from_sender {
let order = self.future.drop(&sender, &k).unwrap();
self.current.insert(sender, k, order.penalize());
self.future.insert(sender, k, order.penalize());
}
}
@@ -735,6 +753,15 @@ impl TransactionQueue {
.collect()
}
#[cfg(test)]
fn future_transactions(&self) -> Vec<SignedTransaction> {
self.future.by_priority
.iter()
.map(|t| self.by_hash.get(&t.hash).expect("All transactions in `current` and `future` are always included in `by_hash`"))
.map(|t| t.transaction.clone())
.collect()
}
/// Returns hashes of all transactions from current, ordered by priority.
pub fn pending_hashes(&self) -> Vec<H256> {
self.current.by_priority
@@ -818,6 +845,16 @@ impl TransactionQueue {
let nonce = tx.nonce();
let hash = tx.hash();
{
// Rough size sanity check
let gas = &tx.transaction.gas;
if U256::from(tx.transaction.data.len()) > *gas {
// Droping transaction
trace!(target: "txqueue", "Dropping oversized transaction: {:?} (gas: {} < size {})", hash, gas, tx.transaction.data.len());
return Err(TransactionError::LimitReached);
}
}
// The transaction might be old, let's check that.
// This has to be the first test, otherwise calculating
// nonce height would result in overflow.
@@ -970,6 +1007,7 @@ mod test {
}
fn default_nonce() -> U256 { 123.into() }
fn default_gas_val() -> U256 { 100_000.into() }
fn default_gas_price() -> U256 { 1.into() }
fn new_unsigned_tx(nonce: U256, gas_price: U256) -> Transaction {
@@ -977,7 +1015,7 @@ mod test {
action: Action::Create,
value: U256::from(100),
data: "3331600055".from_hex().unwrap(),
gas: U256::from(100_000),
gas: default_gas_val(),
gas_price: gas_price,
nonce: nonce
}
@@ -1042,7 +1080,7 @@ mod test {
#[test]
fn should_return_correct_nonces_when_dropped_because_of_limit() {
// given
let mut txq = TransactionQueue::with_limits(2, !U256::zero());
let mut txq = TransactionQueue::with_limits(2, !U256::zero(), !U256::zero());
let (tx1, tx2) = new_tx_pair(123.into(), 1.into(), 1.into(), 0.into());
let sender = tx1.sender().unwrap();
let nonce = tx1.nonce;
@@ -1080,7 +1118,8 @@ mod test {
by_priority: BTreeSet::new(),
by_address: Table::new(),
by_gas_price: Default::default(),
limit: 1
limit: 1,
gas_limit: !U256::zero(),
};
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
let tx1 = VerifiedTransaction::new(tx1, TransactionOrigin::External).unwrap();
@@ -1120,7 +1159,8 @@ mod test {
by_priority: BTreeSet::new(),
by_address: Table::new(),
by_gas_price: Default::default(),
limit: 1
limit: 1,
gas_limit: !U256::zero(),
};
// Create two transactions with same nonce
// (same hash)
@@ -1168,7 +1208,8 @@ mod test {
by_priority: BTreeSet::new(),
by_address: Table::new(),
by_gas_price: Default::default(),
limit: 2
limit: 2,
gas_limit: !U256::zero(),
};
let tx = new_tx_default();
let tx1 = VerifiedTransaction::new(tx.clone(), TransactionOrigin::External).unwrap();
@@ -1185,7 +1226,8 @@ mod test {
by_priority: BTreeSet::new(),
by_address: Table::new(),
by_gas_price: Default::default(),
limit: 1
limit: 1,
gas_limit: !U256::zero(),
};
assert_eq!(set.gas_price_entry_limit(), 0.into());
@@ -1463,6 +1505,36 @@ mod test {
assert_eq!(top.len(), 2);
}
#[test]
fn should_penalize_transactions_from_sender_in_future() {
// given
let prev_nonce = |a: &Address| AccountDetails{ nonce: default_account_details(a).nonce - U256::one(), balance: !U256::zero() };
let mut txq = TransactionQueue::new();
// txa, txb - slightly bigger gas price to have consistent ordering
let (txa, txb) = new_tx_pair_default(1.into(), 0.into());
let (tx1, tx2) = new_tx_pair_with_gas_price_increment(3.into());
// insert everything
txq.add(txa.clone(), &prev_nonce, TransactionOrigin::External).unwrap();
txq.add(txb.clone(), &prev_nonce, TransactionOrigin::External).unwrap();
txq.add(tx1.clone(), &prev_nonce, TransactionOrigin::External).unwrap();
txq.add(tx2.clone(), &prev_nonce, TransactionOrigin::External).unwrap();
assert_eq!(txq.status().future, 4);
// when
txq.penalize(&tx1.hash());
// then
let top = txq.future_transactions();
assert_eq!(top[0], txa);
assert_eq!(top[1], txb);
assert_eq!(top[2], tx1);
assert_eq!(top[3], tx2);
assert_eq!(top.len(), 4);
}
#[test]
fn should_penalize_transactions_from_sender() {
// given
@@ -1651,7 +1723,7 @@ mod test {
#[test]
fn should_drop_old_transactions_when_hitting_the_limit() {
// given
let mut txq = TransactionQueue::with_limits(1, !U256::zero());
let mut txq = TransactionQueue::with_limits(1, !U256::zero(), !U256::zero());
let (tx, tx2) = new_tx_pair_default(1.into(), 0.into());
let sender = tx.sender().unwrap();
let nonce = tx.nonce;
@@ -1672,7 +1744,7 @@ mod test {
#[test]
fn should_limit_future_transactions() {
let mut txq = TransactionQueue::with_limits(1, !U256::zero());
let mut txq = TransactionQueue::with_limits(1, !U256::zero(), !U256::zero());
txq.current.set_limit(10);
let (tx1, tx2) = new_tx_pair_default(4.into(), 1.into());
let (tx3, tx4) = new_tx_pair_default(4.into(), 2.into());
@@ -1689,6 +1761,30 @@ mod test {
assert_eq!(txq.status().future, 1);
}
#[test]
fn should_limit_by_gas() {
let mut txq = TransactionQueue::with_limits(100, default_gas_val() * U256::from(2), !U256::zero());
let (tx1, tx2) = new_tx_pair_default(U256::from(1), U256::from(1));
let (tx3, tx4) = new_tx_pair_default(U256::from(1), U256::from(2));
txq.add(tx1.clone(), &default_account_details, TransactionOrigin::External).ok();
txq.add(tx2.clone(), &default_account_details, TransactionOrigin::External).ok();
txq.add(tx3.clone(), &default_account_details, TransactionOrigin::External).ok();
txq.add(tx4.clone(), &default_account_details, TransactionOrigin::External).ok();
assert_eq!(txq.status().pending, 2);
}
#[test]
fn should_keep_own_transactions_above_gas_limit() {
let mut txq = TransactionQueue::with_limits(100, default_gas_val() * U256::from(2), !U256::zero());
let (tx1, tx2) = new_tx_pair_default(U256::from(1), U256::from(1));
let (tx3, tx4) = new_tx_pair_default(U256::from(1), U256::from(2));
txq.add(tx1.clone(), &default_account_details, TransactionOrigin::Local).unwrap();
txq.add(tx2.clone(), &default_account_details, TransactionOrigin::Local).unwrap();
txq.add(tx3.clone(), &default_account_details, TransactionOrigin::Local).unwrap();
txq.add(tx4.clone(), &default_account_details, TransactionOrigin::Local).unwrap();
assert_eq!(txq.status().pending, 4);
}
#[test]
fn should_drop_transactions_with_old_nonces() {
let mut txq = TransactionQueue::new();
@@ -1932,7 +2028,7 @@ mod test {
#[test]
fn should_keep_right_order_in_future() {
// given
let mut txq = TransactionQueue::with_limits(1, !U256::zero());
let mut txq = TransactionQueue::with_limits(1, !U256::zero(), !U256::zero());
let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into());
let prev_nonce = |a: &Address| AccountDetails { nonce: default_account_details(a).nonce - U256::one(), balance:
default_account_details(a).balance };

View File

@@ -51,7 +51,7 @@ use rand::{Rng, OsRng};
pub use self::error::Error;
pub use self::service::{Service, DatabaseRestore};
pub use self::traits::{SnapshotService, RemoteSnapshotService};
pub use self::traits::SnapshotService;
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
pub use types::restoration_status::RestorationStatus;
@@ -67,6 +67,12 @@ mod watcher;
#[cfg(test)]
mod tests;
/// IPC interfaces
#[cfg(feature="ipc")]
pub mod remote {
pub use super::traits::RemoteSnapshotService;
}
mod traits {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/snapshot_service_trait.rs"));

View File

@@ -16,7 +16,6 @@
//! Single account in the system.
use std::collections::hash_map::Entry;
use util::*;
use pod_account::*;
use rlp::*;
@@ -24,9 +23,11 @@ use lru_cache::LruCache;
use std::cell::{RefCell, Cell};
const STORAGE_CACHE_ITEMS: usize = 4096;
const STORAGE_CACHE_ITEMS: usize = 8192;
/// Single account in the system.
/// Keeps track of changes to the code and storage.
/// The changes are applied in `commit_storage` and `commit_code`
pub struct Account {
// Balance of the account.
balance: U256,
@@ -46,8 +47,6 @@ pub struct Account {
code_size: Option<usize>,
// Code cache of the account.
code_cache: Arc<Bytes>,
// Account is new or has been modified.
filth: Filth,
// Account code new or has been modified.
code_filth: Filth,
// Cached address hash.
@@ -67,7 +66,6 @@ impl Account {
code_hash: code.sha3(),
code_size: Some(code.len()),
code_cache: Arc::new(code),
filth: Filth::Dirty,
code_filth: Filth::Dirty,
address_hash: Cell::new(None),
}
@@ -89,7 +87,6 @@ impl Account {
code_filth: Filth::Dirty,
code_size: Some(pod.code.as_ref().map_or(0, |c| c.len())),
code_cache: Arc::new(pod.code.map_or_else(|| { warn!("POD account with unknown code is being created! Assuming no code."); vec![] }, |c| c)),
filth: Filth::Dirty,
address_hash: Cell::new(None),
}
}
@@ -105,7 +102,6 @@ impl Account {
code_hash: SHA3_EMPTY,
code_cache: Arc::new(vec![]),
code_size: Some(0),
filth: Filth::Dirty,
code_filth: Filth::Clean,
address_hash: Cell::new(None),
}
@@ -123,7 +119,6 @@ impl Account {
code_hash: r.val_at(3),
code_cache: Arc::new(vec![]),
code_size: None,
filth: Filth::Clean,
code_filth: Filth::Clean,
address_hash: Cell::new(None),
}
@@ -141,7 +136,6 @@ impl Account {
code_hash: SHA3_EMPTY,
code_cache: Arc::new(vec![]),
code_size: None,
filth: Filth::Dirty,
code_filth: Filth::Clean,
address_hash: Cell::new(None),
}
@@ -153,7 +147,6 @@ impl Account {
self.code_hash = code.sha3();
self.code_cache = Arc::new(code);
self.code_size = Some(self.code_cache.len());
self.filth = Filth::Dirty;
self.code_filth = Filth::Dirty;
}
@@ -164,17 +157,7 @@ impl Account {
/// Set (and cache) the contents of the trie's storage at `key` to `value`.
pub fn set_storage(&mut self, key: H256, value: H256) {
match self.storage_changes.entry(key) {
Entry::Occupied(ref mut entry) if entry.get() != &value => {
entry.insert(value);
self.filth = Filth::Dirty;
},
Entry::Vacant(entry) => {
entry.insert(value);
self.filth = Filth::Dirty;
},
_ => {},
}
self.storage_changes.insert(key, value);
}
/// Get (and cache) the contents of the trie's storage at `key`.
@@ -263,17 +246,6 @@ impl Account {
!self.code_cache.is_empty() || (self.code_cache.is_empty() && self.code_hash == SHA3_EMPTY)
}
/// Is this a new or modified account?
pub fn is_dirty(&self) -> bool {
self.filth == Filth::Dirty || self.code_filth == Filth::Dirty || !self.storage_is_clean()
}
/// Mark account as clean.
pub fn set_clean(&mut self) {
assert!(self.storage_is_clean());
self.filth = Filth::Clean
}
/// Provide a database to get `code_hash`. Should not be called if it is a contract without code.
pub fn cache_code(&mut self, db: &HashDB) -> bool {
// TODO: fill out self.code_cache;
@@ -326,25 +298,18 @@ impl Account {
/// Increment the nonce of the account by one.
pub fn inc_nonce(&mut self) {
self.nonce = self.nonce + U256::from(1u8);
self.filth = Filth::Dirty;
}
/// Increment the nonce of the account by one.
/// Increase account balance.
pub fn add_balance(&mut self, x: &U256) {
if !x.is_zero() {
self.balance = self.balance + *x;
self.filth = Filth::Dirty;
}
self.balance = self.balance + *x;
}
/// Increment the nonce of the account by one.
/// Decrease account balance.
/// Panics if balance is less than `x`
pub fn sub_balance(&mut self, x: &U256) {
if !x.is_zero() {
assert!(self.balance >= *x);
self.balance = self.balance - *x;
self.filth = Filth::Dirty;
}
assert!(self.balance >= *x);
self.balance = self.balance - *x;
}
/// Commit the `storage_changes` to the backing DB and update `storage_root`.
@@ -406,7 +371,6 @@ impl Account {
code_hash: self.code_hash.clone(),
code_size: self.code_size.clone(),
code_cache: self.code_cache.clone(),
filth: self.filth,
code_filth: self.code_filth,
address_hash: self.address_hash.clone(),
}
@@ -427,10 +391,10 @@ impl Account {
account
}
/// Replace self with the data from other account merging storage cache
pub fn merge_with(&mut self, other: Account) {
assert!(self.storage_is_clean());
assert!(other.storage_is_clean());
/// Replace self with the data from other account merging storage cache.
/// Basic account data and all modifications are overwritten
/// with new values.
pub fn overwrite_with(&mut self, other: Account) {
self.balance = other.balance;
self.nonce = other.nonce;
self.storage_root = other.storage_root;
@@ -443,6 +407,7 @@ impl Account {
for (k, v) in other.storage_cache.into_inner().into_iter() {
cache.insert(k.clone() , v.clone()); //TODO: cloning should not be required here
}
self.storage_changes = other.storage_changes;
}
}

View File

@@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::cell::{RefCell, RefMut};
use std::collections::hash_map::Entry;
use common::*;
use engines::Engine;
use executive::{Executive, TransactOptions};
@@ -42,42 +43,93 @@ pub struct ApplyOutcome {
/// Result type for the execution ("application") of a transaction.
pub type ApplyResult = Result<ApplyOutcome, Error>;
#[derive(Debug)]
enum AccountEntry {
/// Contains account data.
Cached(Account),
/// Account has been deleted.
Killed,
/// Account does not exist.
Missing,
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
/// Account modification state. Used to check if the account was
/// Modified in between commits and overall.
enum AccountState {
/// Account was loaded from disk and never modified in this state object.
CleanFresh,
/// Account was loaded from the global cache and never modified.
CleanCached,
/// Account has been modified and is not committed to the trie yet.
/// This is set if any of the account data is changed, including
/// storage and code.
Dirty,
/// Account was modified and committed to the trie.
Committed,
}
#[derive(Debug)]
/// In-memory copy of the account data. Holds the optional account
/// and the modification status.
/// Account entry can contain existing (`Some`) or non-existing
/// account (`None`)
struct AccountEntry {
account: Option<Account>,
state: AccountState,
}
// Account cache item. Contains account data and
// modification state
impl AccountEntry {
fn is_dirty(&self) -> bool {
match *self {
AccountEntry::Cached(ref a) => a.is_dirty(),
AccountEntry::Killed => true,
AccountEntry::Missing => false,
}
self.state == AccountState::Dirty
}
/// Clone dirty data into new `AccountEntry`.
/// Clone dirty data into new `AccountEntry`. This includes
/// basic account data and modified storage keys.
/// Returns None if clean.
fn clone_dirty(&self) -> Option<AccountEntry> {
match *self {
AccountEntry::Cached(ref acc) if acc.is_dirty() => Some(AccountEntry::Cached(acc.clone_dirty())),
AccountEntry::Killed => Some(AccountEntry::Killed),
_ => None,
fn clone_if_dirty(&self) -> Option<AccountEntry> {
match self.is_dirty() {
true => Some(self.clone_dirty()),
false => None,
}
}
/// Clone account entry data that needs to be saved in the snapshot.
/// This includes basic account information and all locally cached storage keys
fn clone_for_snapshot(&self) -> AccountEntry {
match *self {
AccountEntry::Cached(ref acc) => AccountEntry::Cached(acc.clone_all()),
AccountEntry::Killed => AccountEntry::Killed,
AccountEntry::Missing => AccountEntry::Missing,
/// Clone dirty data into new `AccountEntry`. This includes
/// basic account data and modified storage keys.
fn clone_dirty(&self) -> AccountEntry {
AccountEntry {
account: self.account.as_ref().map(Account::clone_dirty),
state: self.state,
}
}
// Create a new account entry and mark it as dirty.
fn new_dirty(account: Option<Account>) -> AccountEntry {
AccountEntry {
account: account,
state: AccountState::Dirty,
}
}
// Create a new account entry and mark it as clean.
fn new_clean(account: Option<Account>) -> AccountEntry {
AccountEntry {
account: account,
state: AccountState::CleanFresh,
}
}
// Create a new account entry and mark it as clean and cached.
fn new_clean_cached(account: Option<Account>) -> AccountEntry {
AccountEntry {
account: account,
state: AccountState::CleanCached,
}
}
// Replace data with another entry but preserve storage cache.
fn overwrite_with(&mut self, other: AccountEntry) {
self.state = other.state;
match other.account {
Some(acc) => match self.account {
Some(ref mut ours) => {
ours.overwrite_with(acc);
},
None => {},
},
None => self.account = None,
}
}
}
@@ -90,6 +142,9 @@ impl AccountEntry {
/// locally from previous commits. Global cache reflects the database
/// state and never contains any changes.
///
/// Cache items contains account data, or the flag that account does not exist
/// and modification state (see `AccountState`)
///
/// Account data can be in the following cache states:
/// * In global but not local - something that was queried from the database,
/// but never modified
@@ -103,12 +158,32 @@ impl AccountEntry {
/// then global state cache. If data is not found in any of the caches
/// it is loaded from the DB to the local cache.
///
/// Upon destruction all the local cache data merged into the global cache.
/// The merge might be rejected if current state is non-canonical.
/// **** IMPORTANT *************************************************************
/// All the modifications to the account data must set the `Dirty` state in the
/// `AccountEntry`. This is done in `require` and `require_or_from`. So just
/// use that.
/// ****************************************************************************
///
/// Upon destruction all the local cache data propagated into the global cache.
/// Propagated items might be rejected if current state is non-canonical.
///
/// State snapshotting.
///
/// A new snapshot can be created with `snapshot()`. Snapshots can be
/// created in a hierarchy.
/// When a snapshot is active all changes are applied directly into
/// `cache` and the original value is copied into an active snapshot.
/// Reverting a snapshot with `revert_to_snapshot` involves copying
/// original values from the latest snapshot back into `cache`. The code
/// takes care not to overwrite cached storage while doing that.
/// Snapshot can be discateded with `discard_snapshot`. All of the orignal
/// backed-up values are moved into a parent snapshot (if any).
///
pub struct State {
db: StateDB,
root: H256,
cache: RefCell<HashMap<Address, AccountEntry>>,
// The original account is preserved in
snapshots: RefCell<Vec<HashMap<Address, Option<AccountEntry>>>>,
account_start_nonce: U256,
factories: Factories,
@@ -162,35 +237,48 @@ impl State {
Ok(state)
}
/// Create a recoverable snaphot of this state
/// Create a recoverable snaphot of this state.
pub fn snapshot(&mut self) {
self.snapshots.borrow_mut().push(HashMap::new());
}
/// Merge last snapshot with previous
pub fn clear_snapshot(&mut self) {
/// Merge last snapshot with previous.
pub fn discard_snapshot(&mut self) {
// merge with previous snapshot
let last = self.snapshots.borrow_mut().pop();
if let Some(mut snapshot) = last {
if let Some(ref mut prev) = self.snapshots.borrow_mut().last_mut() {
for (k, v) in snapshot.drain() {
prev.entry(k).or_insert(v);
if prev.is_empty() {
**prev = snapshot;
} else {
for (k, v) in snapshot.drain() {
prev.entry(k).or_insert(v);
}
}
}
}
}
/// Revert to snapshot
pub fn revert_snapshot(&mut self) {
/// Revert to the last snapshot and discard it.
pub fn revert_to_snapshot(&mut self) {
if let Some(mut snapshot) = self.snapshots.borrow_mut().pop() {
for (k, v) in snapshot.drain() {
match v {
Some(v) => {
self.cache.borrow_mut().insert(k, v);
match self.cache.borrow_mut().entry(k) {
Entry::Occupied(mut e) => {
// Merge snapshotted changes back into the main account
// storage preserving the cache.
e.get_mut().overwrite_with(v);
},
Entry::Vacant(e) => {
e.insert(v);
}
}
},
None => {
match self.cache.borrow_mut().entry(k) {
::std::collections::hash_map::Entry::Occupied(e) => {
Entry::Occupied(e) => {
if e.get().is_dirty() {
e.remove();
}
@@ -204,10 +292,17 @@ impl State {
}
fn insert_cache(&self, address: &Address, account: AccountEntry) {
if let Some(ref mut snapshot) = self.snapshots.borrow_mut().last_mut() {
if !snapshot.contains_key(address) {
snapshot.insert(address.clone(), self.cache.borrow_mut().insert(address.clone(), account));
return;
// Dirty account which is not in the cache means this is a new account.
// It goes directly into the snapshot as there's nothing to rever to.
//
// In all other cases account is read as clean first, and after that made
// dirty in and added to the snapshot with `note_cache`.
if account.is_dirty() {
if let Some(ref mut snapshot) = self.snapshots.borrow_mut().last_mut() {
if !snapshot.contains_key(address) {
snapshot.insert(address.clone(), self.cache.borrow_mut().insert(address.clone(), account));
return;
}
}
}
self.cache.borrow_mut().insert(address.clone(), account);
@@ -216,14 +311,14 @@ impl State {
fn note_cache(&self, address: &Address) {
if let Some(ref mut snapshot) = self.snapshots.borrow_mut().last_mut() {
if !snapshot.contains_key(address) {
snapshot.insert(address.clone(), self.cache.borrow().get(address).map(AccountEntry::clone_for_snapshot));
snapshot.insert(address.clone(), self.cache.borrow().get(address).map(AccountEntry::clone_dirty));
}
}
}
/// Destroy the current object and return root and database.
pub fn drop(mut self) -> (H256, StateDB) {
self.commit_cache();
self.propagate_to_global_cache();
(self.root, self.db)
}
@@ -235,12 +330,12 @@ impl State {
/// Create a new contract at address `contract`. If there is already an account at the address
/// it will have its code reset, ready for `init_code()`.
pub fn new_contract(&mut self, contract: &Address, balance: U256) {
self.insert_cache(contract, AccountEntry::Cached(Account::new_contract(balance, self.account_start_nonce)));
self.insert_cache(contract, AccountEntry::new_dirty(Some(Account::new_contract(balance, self.account_start_nonce))));
}
/// Remove an existing account.
pub fn kill_account(&mut self, account: &Address) {
self.insert_cache(account, AccountEntry::Killed);
self.insert_cache(account, AccountEntry::new_dirty(None));
}
/// Determine whether an account exists.
@@ -272,8 +367,8 @@ impl State {
let local_cache = self.cache.borrow_mut();
let mut local_account = None;
if let Some(maybe_acc) = local_cache.get(address) {
match *maybe_acc {
AccountEntry::Cached(ref account) => {
match maybe_acc.account {
Some(ref account) => {
if let Some(value) = account.cached_storage_at(key) {
return value;
} else {
@@ -292,7 +387,7 @@ impl State {
return result;
}
if let Some(ref mut acc) = local_account {
if let AccountEntry::Cached(ref account) = **acc {
if let Some(ref account) = acc.account {
let account_db = self.factories.accountdb.readonly(self.db.as_hashdb(), account.address_hash(address));
return account.storage_at(account_db.as_hashdb(), key)
} else {
@@ -314,10 +409,7 @@ impl State {
let account_db = self.factories.accountdb.readonly(self.db.as_hashdb(), a.address_hash(address));
a.storage_at(account_db.as_hashdb(), key)
});
match maybe_acc {
Some(account) => self.insert_cache(address, AccountEntry::Cached(account)),
None => self.insert_cache(address, AccountEntry::Missing),
}
self.insert_cache(address, AccountEntry::new_clean(maybe_acc));
r
}
@@ -341,13 +433,17 @@ impl State {
/// Add `incr` to the balance of account `a`.
pub fn add_balance(&mut self, a: &Address, incr: &U256) {
trace!(target: "state", "add_balance({}, {}): {}", a, incr, self.balance(a));
self.require(a, false).add_balance(incr);
if !incr.is_zero() || !self.exists(a) {
self.require(a, false).add_balance(incr);
}
}
/// Subtract `decr` from the balance of account `a`.
pub fn sub_balance(&mut self, a: &Address, decr: &U256) {
trace!(target: "state", "sub_balance({}, {}): {}", a, decr, self.balance(a));
self.require(a, false).sub_balance(decr);
if !decr.is_zero() || !self.exists(a) {
self.require(a, false).sub_balance(decr);
}
}
/// Subtracts `by` from the balance of `from` and adds it to that of `to`.
@@ -363,7 +459,9 @@ impl State {
/// Mutate storage of account `a` so that it is `value` for `key`.
pub fn set_storage(&mut self, a: &Address, key: H256, value: H256) {
self.require(a, false).set_storage(key, value)
if self.storage_at(a, &key) != value {
self.require(a, false).set_storage(key, value)
}
}
/// Initialise the code of account `a` so that it is `code`.
@@ -404,10 +502,9 @@ impl State {
accounts: &mut HashMap<Address, AccountEntry>
) -> Result<(), Error> {
// first, commit the sub trees.
// TODO: is this necessary or can we dispense with the `ref mut a` for just `a`?
for (address, ref mut a) in accounts.iter_mut() {
match a {
&mut&mut AccountEntry::Cached(ref mut account) if account.is_dirty() => {
for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) {
match a.account {
Some(ref mut account) => {
db.note_account_bloom(&address);
let addr_hash = account.address_hash(address);
let mut account_db = factories.accountdb.create(db.as_hashdb_mut(), addr_hash);
@@ -420,17 +517,15 @@ impl State {
{
let mut trie = factories.trie.from_existing(db.as_hashdb_mut(), root).unwrap();
for (address, ref mut a) in accounts.iter_mut() {
match **a {
AccountEntry::Cached(ref mut account) if account.is_dirty() => {
account.set_clean();
for (address, ref mut a) in accounts.iter_mut().filter(|&(_, ref a)| a.is_dirty()) {
a.state = AccountState::Committed;
match a.account {
Some(ref mut account) => {
try!(trie.insert(address, &account.rlp()));
},
AccountEntry::Killed => {
None => {
try!(trie.remove(address));
**a = AccountEntry::Missing;
},
_ => {},
}
}
}
@@ -438,20 +533,12 @@ impl State {
Ok(())
}
fn commit_cache(&mut self) {
/// Propagate local cache into shared canonical state cache.
fn propagate_to_global_cache(&mut self) {
let mut addresses = self.cache.borrow_mut();
for (address, a) in addresses.drain() {
match a {
AccountEntry::Cached(account) => {
if !account.is_dirty() {
self.db.cache_account(address, Some(account));
}
},
AccountEntry::Missing => {
self.db.cache_account(address, None);
},
_ => {},
}
trace!("Committing cache {:?} entries", addresses.len());
for (address, a) in addresses.drain().filter(|&(_, ref a)| a.state == AccountState::Committed || a.state == AccountState::CleanFresh) {
self.db.add_to_account_cache(address, a.account, a.state == AccountState::Committed);
}
}
@@ -473,7 +560,7 @@ impl State {
assert!(self.snapshots.borrow().is_empty());
for (add, acc) in accounts.drain().into_iter() {
self.db.note_account_bloom(&add);
self.cache.borrow_mut().insert(add, AccountEntry::Cached(Account::from_pod(acc)));
self.cache.borrow_mut().insert(add, AccountEntry::new_dirty(Some(Account::from_pod(acc))));
}
}
@@ -483,7 +570,7 @@ impl State {
// TODO: handle database rather than just the cache.
// will need fat db.
PodState::from(self.cache.borrow().iter().fold(BTreeMap::new(), |mut m, (add, opt)| {
if let AccountEntry::Cached(ref acc) = *opt {
if let Some(ref acc) = opt.account {
m.insert(add.clone(), PodAccount::from_account(acc));
}
m
@@ -530,7 +617,7 @@ impl State {
where F: Fn(Option<&Account>) -> U {
// check local cache first
if let Some(ref mut maybe_acc) = self.cache.borrow_mut().get_mut(a) {
if let AccountEntry::Cached(ref mut account) = **maybe_acc {
if let Some(ref mut account) = maybe_acc.account {
let accountdb = self.factories.accountdb.readonly(self.db.as_hashdb(), account.address_hash(a));
Self::update_account_cache(require, account, accountdb.as_hashdb());
return f(Some(account));
@@ -562,10 +649,7 @@ impl State {
Self::update_account_cache(require, account, accountdb.as_hashdb());
}
let r = f(maybe_acc.as_ref());
match maybe_acc {
Some(account) => self.insert_cache(a, AccountEntry::Cached(account)),
None => self.insert_cache(a, AccountEntry::Missing),
}
self.insert_cache(a, AccountEntry::new_clean(maybe_acc));
r
}
}
@@ -584,36 +668,38 @@ impl State {
let contains_key = self.cache.borrow().contains_key(a);
if !contains_key {
match self.db.get_cached_account(a) {
Some(Some(acc)) => self.insert_cache(a, AccountEntry::Cached(acc)),
Some(None) => self.insert_cache(a, AccountEntry::Missing),
Some(acc) => self.insert_cache(a, AccountEntry::new_clean_cached(acc)),
None => {
let maybe_acc = if self.db.check_account_bloom(a) {
let db = self.factories.trie.readonly(self.db.as_hashdb(), &self.root).expect(SEC_TRIE_DB_UNWRAP_STR);
let maybe_acc = match db.get(a) {
Ok(Some(acc)) => AccountEntry::Cached(Account::from_rlp(acc)),
Ok(None) => AccountEntry::Missing,
Ok(Some(acc)) => AccountEntry::new_clean(Some(Account::from_rlp(acc))),
Ok(None) => AccountEntry::new_clean(None),
Err(e) => panic!("Potential DB corruption encountered: {}", e),
};
maybe_acc
}
else {
AccountEntry::Missing
AccountEntry::new_clean(None)
};
self.insert_cache(a, maybe_acc);
}
}
} else {
self.note_cache(a);
}
match self.cache.borrow_mut().get_mut(a).unwrap() {
&mut AccountEntry::Cached(ref mut acc) => not_default(acc),
slot => *slot = AccountEntry::Cached(default()),
}
self.note_cache(a);
match &mut self.cache.borrow_mut().get_mut(a).unwrap().account {
&mut Some(ref mut acc) => not_default(acc),
slot => *slot = Some(default()),
}
// at this point the account is guaranteed to be in the cache.
RefMut::map(self.cache.borrow_mut(), |c| {
match c.get_mut(a).unwrap() {
&mut AccountEntry::Cached(ref mut account) => {
let mut entry = c.get_mut(a).unwrap();
// set the dirty flag after changing account data.
entry.state = AccountState::Dirty;
match entry.account {
Some(ref mut account) => {
if require_code {
let addr_hash = account.address_hash(a);
let accountdb = self.factories.accountdb.readonly(self.db.as_hashdb(), addr_hash);
@@ -638,7 +724,7 @@ impl Clone for State {
let cache = {
let mut cache: HashMap<Address, AccountEntry> = HashMap::new();
for (key, val) in self.cache.borrow().iter() {
if let Some(entry) = val.clone_dirty() {
if let Some(entry) = val.clone_if_dirty() {
cache.insert(key.clone(), entry);
}
}
@@ -1679,12 +1765,12 @@ fn snapshot_basic() {
state.snapshot();
state.add_balance(&a, &U256::from(69u64));
assert_eq!(state.balance(&a), U256::from(69u64));
state.clear_snapshot();
state.discard_snapshot();
assert_eq!(state.balance(&a), U256::from(69u64));
state.snapshot();
state.add_balance(&a, &U256::from(1u64));
assert_eq!(state.balance(&a), U256::from(70u64));
state.revert_snapshot();
state.revert_to_snapshot();
assert_eq!(state.balance(&a), U256::from(69u64));
}
@@ -1697,9 +1783,9 @@ fn snapshot_nested() {
state.snapshot();
state.add_balance(&a, &U256::from(69u64));
assert_eq!(state.balance(&a), U256::from(69u64));
state.clear_snapshot();
state.discard_snapshot();
assert_eq!(state.balance(&a), U256::from(69u64));
state.revert_snapshot();
state.revert_to_snapshot();
assert_eq!(state.balance(&a), U256::from(0));
}

View File

@@ -14,56 +14,94 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{VecDeque, HashSet};
use lru_cache::LruCache;
use util::journaldb::JournalDB;
use util::hash::{H256};
use util::hashdb::HashDB;
use state::Account;
use header::BlockNumber;
use util::{Arc, Address, Database, DBTransaction, UtilError, Mutex, Hashable};
use bloom_journal::{Bloom, BloomJournal};
use db::COL_ACCOUNT_BLOOM;
use byteorder::{LittleEndian, ByteOrder};
const STATE_CACHE_ITEMS: usize = 65536;
const STATE_CACHE_ITEMS: usize = 256000;
const STATE_CACHE_BLOCKS: usize = 8;
pub const ACCOUNT_BLOOM_SPACE: usize = 1048576;
pub const DEFAULT_ACCOUNT_PRESET: usize = 1000000;
pub const ACCOUNT_BLOOM_HASHCOUNT_KEY: &'static [u8] = b"account_hash_count";
/// Shared canonical state cache.
struct AccountCache {
/// DB Account cache. `None` indicates that account is known to be missing.
accounts: LruCache<Address, Option<Account>>,
/// Information on the modifications in recently committed blocks; specifically which addresses
/// changed in which block. Ordered by block number.
modifications: VecDeque<BlockChanges>,
}
/// Buffered account cache item.
struct CacheQueueItem {
/// Account address.
address: Address,
/// Acccount data or `None` if account does not exist.
account: Option<Account>,
/// Indicates that the account was modified before being
/// added to the cache.
modified: bool,
}
#[derive(Debug)]
/// Accumulates a list of accounts changed in a block.
struct BlockChanges {
/// Block number.
number: BlockNumber,
/// Block hash.
hash: H256,
/// Parent block hash.
parent: H256,
/// A set of modified account addresses.
accounts: HashSet<Address>,
/// Block is part of the canonical chain.
is_canon: bool,
}
/// State database abstraction.
/// Manages shared global state cache.
/// Manages shared global state cache which reflects the canonical
/// state as it is on the disk. All the entries in the cache are clean.
/// A clone of `StateDB` may be created as canonical or not.
/// For canonical clones cache changes are accumulated and applied
/// on commit.
/// For non-canonical clones cache is cleared on commit.
/// For canonical clones local cache is accumulated and applied
/// in `sync_cache`
/// For non-canonical clones local cache is dropped.
///
/// Global cache propagation.
/// After a `State` object has been committed to the trie it
/// propagates its local cache into the `StateDB` local cache
/// using `add_to_account_cache` function.
/// Then, after the block has been added to the chain the local cache in the
/// `StateDB` is propagated into the global cache.
pub struct StateDB {
/// Backing database.
db: Box<JournalDB>,
/// Shared canonical state cache.
account_cache: Arc<Mutex<AccountCache>>,
cache_overlay: Vec<(Address, Option<Account>)>,
is_canon: bool,
/// Local dirty cache.
local_cache: Vec<CacheQueueItem>,
/// Shared account bloom. Does not handle chain reorganizations.
account_bloom: Arc<Mutex<Bloom>>,
/// Hash of the block on top of which this instance was created or
/// `None` if cache is disabled
parent_hash: Option<H256>,
/// Hash of the committing block or `None` if not committed yet.
commit_hash: Option<H256>,
/// Number of the committing block or `None` if not committed yet.
commit_number: Option<BlockNumber>,
}
impl StateDB {
/// Create a new instance wrapping `JournalDB`
pub fn new(db: Box<JournalDB>) -> StateDB {
let bloom = Self::load_bloom(db.backing());
StateDB {
db: db,
account_cache: Arc::new(Mutex::new(AccountCache { accounts: LruCache::new(STATE_CACHE_ITEMS) })),
cache_overlay: Vec::new(),
is_canon: false,
account_bloom: Arc::new(Mutex::new(bloom)),
}
}
/// Loads accounts bloom from the database
/// This bloom is used to handle request for the non-existant account fast
pub fn load_bloom(db: &Database) -> Bloom {
@@ -91,6 +129,23 @@ impl StateDB {
bloom
}
/// Create a new instance wrapping `JournalDB`
pub fn new(db: Box<JournalDB>) -> StateDB {
let bloom = Self::load_bloom(db.backing());
StateDB {
db: db,
account_cache: Arc::new(Mutex::new(AccountCache {
accounts: LruCache::new(STATE_CACHE_ITEMS),
modifications: VecDeque::new(),
})),
local_cache: Vec::new(),
account_bloom: Arc::new(Mutex::new(bloom)),
parent_hash: None,
commit_hash: None,
commit_number: None,
}
}
pub fn check_account_bloom(&self, address: &Address) -> bool {
trace!(target: "account_bloom", "Check account bloom: {:?}", address);
let bloom = self.account_bloom.lock();
@@ -125,14 +180,107 @@ impl StateDB {
try!(Self::commit_bloom(batch, bloom_lock.drain_journal()));
}
let records = try!(self.db.commit(batch, now, id, end));
if self.is_canon {
self.commit_cache();
} else {
self.clear_cache();
}
self.commit_hash = Some(id.clone());
self.commit_number = Some(now);
Ok(records)
}
/// Propagate local cache into the global cache and synchonize
/// the global cache with the best block state.
/// This function updates the global cache by removing entries
/// that are invalidated by chain reorganization. `sync_cache`
/// should be called after the block has been committed and the
/// blockchain route has ben calculated.
pub fn sync_cache(&mut self, enacted: &[H256], retracted: &[H256], is_best: bool) {
trace!("sync_cache id = (#{:?}, {:?}), parent={:?}, best={}", self.commit_number, self.commit_hash, self.parent_hash, is_best);
let mut cache = self.account_cache.lock();
let mut cache = &mut *cache;
// Purge changes from re-enacted and retracted blocks.
// Filter out commiting block if any.
let mut clear = false;
for block in enacted.iter().filter(|h| self.commit_hash.as_ref().map_or(true, |p| *h != p)) {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
trace!("Reverting enacted block {:?}", block);
m.is_canon = true;
for a in &m.accounts {
trace!("Reverting enacted address {:?}", a);
cache.accounts.remove(a);
}
false
} else {
true
}
};
}
for block in retracted {
clear = clear || {
if let Some(ref mut m) = cache.modifications.iter_mut().find(|ref m| &m.hash == block) {
trace!("Retracting block {:?}", block);
m.is_canon = false;
for a in &m.accounts {
trace!("Retracted address {:?}", a);
cache.accounts.remove(a);
}
false
} else {
true
}
};
}
if clear {
// We don't know anything about the block; clear everything
trace!("Wiping cache");
cache.accounts.clear();
cache.modifications.clear();
}
// Propagate cache only if committing on top of the latest canonical state
// blocks are ordered by number and only one block with a given number is marked as canonical
// (contributed to canonical state cache)
if let (Some(ref number), Some(ref hash), Some(ref parent)) = (self.commit_number, self.commit_hash, self.parent_hash) {
if cache.modifications.len() == STATE_CACHE_BLOCKS {
cache.modifications.pop_back();
}
let mut modifications = HashSet::new();
trace!("committing {} cache entries", self.local_cache.len());
for account in self.local_cache.drain(..) {
if account.modified {
modifications.insert(account.address.clone());
}
if is_best {
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&account.address) {
if let Some(new) = account.account {
if account.modified {
existing.overwrite_with(new);
}
continue;
}
}
cache.accounts.insert(account.address, account.account);
}
}
// Save modified accounts. These are ordered by the block number.
let block_changes = BlockChanges {
accounts: modifications,
number: *number,
hash: hash.clone(),
is_canon: is_best,
parent: parent.clone(),
};
let insert_at = cache.modifications.iter().enumerate().find(|&(_, ref m)| m.number < *number).map(|(i, _)| i);
trace!("inserting modifications at {:?}", insert_at);
if let Some(insert_at) = insert_at {
cache.modifications.insert(insert_at, block_changes);
} else {
cache.modifications.push_back(block_changes);
}
}
}
/// Returns an interface to HashDB.
pub fn as_hashdb(&self) -> &HashDB {
self.db.as_hashdb()
@@ -148,20 +296,24 @@ impl StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
cache_overlay: Vec::new(),
is_canon: false,
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
parent_hash: None,
commit_hash: None,
commit_number: None,
}
}
/// Clone the database for a canonical state.
pub fn boxed_clone_canon(&self) -> StateDB {
pub fn boxed_clone_canon(&self, parent: &H256) -> StateDB {
StateDB {
db: self.db.boxed_clone(),
account_cache: self.account_cache.clone(),
cache_overlay: Vec::new(),
is_canon: true,
local_cache: Vec::new(),
account_bloom: self.account_bloom.clone(),
parent_hash: Some(parent.clone()),
commit_hash: None,
commit_number: None,
}
}
@@ -180,53 +332,149 @@ impl StateDB {
&*self.db
}
/// Enqueue cache change.
pub fn cache_account(&mut self, addr: Address, data: Option<Account>) {
self.cache_overlay.push((addr, data));
}
/// Apply pending cache changes.
fn commit_cache(&mut self) {
let mut cache = self.account_cache.lock();
for (address, account) in self.cache_overlay.drain(..) {
if let Some(&mut Some(ref mut existing)) = cache.accounts.get_mut(&address) {
if let Some(new) = account {
existing.merge_with(new);
continue;
}
}
cache.accounts.insert(address, account);
}
}
/// Clear the cache.
pub fn clear_cache(&mut self) {
self.cache_overlay.clear();
let mut cache = self.account_cache.lock();
cache.accounts.clear();
/// Add a local cache entry.
/// The entry will be propagated to the global cache in `sync_cache`.
/// `modified` indicates that the entry was changed since being read from disk or global cache.
/// `data` can be set to an existing (`Some`), or non-existing account (`None`).
pub fn add_to_account_cache(&mut self, addr: Address, data: Option<Account>, modified: bool) {
self.local_cache.push(CacheQueueItem {
address: addr,
account: data,
modified: modified,
})
}
/// Get basic copy of the cached account. Does not include storage.
/// Returns 'None' if the state is non-canonical and cache is disabled
/// or if the account is not cached.
/// Returns 'None' if cache is disabled or if the account is not cached.
pub fn get_cached_account(&self, addr: &Address) -> Option<Option<Account>> {
if !self.is_canon {
let mut cache = self.account_cache.lock();
if !Self::is_allowed(addr, &self.parent_hash, &cache.modifications) {
return None;
}
let mut cache = self.account_cache.lock();
cache.accounts.get_mut(&addr).map(|a| a.as_ref().map(|a| a.clone_basic()))
}
/// Get value from a cached account.
/// Returns 'None' if the state is non-canonical and cache is disabled
/// or if the account is not cached.
/// Returns 'None' if cache is disabled or if the account is not cached.
pub fn get_cached<F, U>(&self, a: &Address, f: F) -> Option<U>
where F: FnOnce(Option<&mut Account>) -> U {
if !self.is_canon {
let mut cache = self.account_cache.lock();
if !Self::is_allowed(a, &self.parent_hash, &cache.modifications) {
return None;
}
let mut cache = self.account_cache.lock();
cache.accounts.get_mut(a).map(|c| f(c.as_mut()))
}
/// Check if the account can be returned from cache by matching current block parent hash against canonical
/// state and filtering out account modified in later blocks.
fn is_allowed(addr: &Address, parent_hash: &Option<H256>, modifications: &VecDeque<BlockChanges>) -> bool {
let mut parent = match *parent_hash {
None => {
trace!("Cache lookup skipped for {:?}: no parent hash", addr);
return false;
}
Some(ref parent) => parent,
};
if modifications.is_empty() {
return true;
}
// Ignore all accounts modified in later blocks
// Modifications contains block ordered by the number
// We search for our parent in that list first and then for
// all its parent until we hit the canonical block,
// checking against all the intermediate modifications.
let mut iter = modifications.iter();
while let Some(ref m) = iter.next() {
if &m.hash == parent {
if m.is_canon {
return true;
}
parent = &m.parent;
}
if m.accounts.contains(addr) {
trace!("Cache lookup skipped for {:?}: modified in a later block", addr);
return false;
}
}
trace!("Cache lookup skipped for {:?}: parent hash is unknown", addr);
return false;
}
}
#[cfg(test)]
mod tests {
use util::{U256, H256, FixedHash, Address, DBTransaction};
use tests::helpers::*;
use state::Account;
use util::log::init_log;
#[test]
fn state_db_smoke() {
init_log();
let mut state_db_result = get_temp_state_db();
let state_db = state_db_result.take();
let root_parent = H256::random();
let address = Address::random();
let h0 = H256::random();
let h1a = H256::random();
let h1b = H256::random();
let h2a = H256::random();
let h2b = H256::random();
let h3a = H256::random();
let h3b = H256::random();
let mut batch = DBTransaction::new(state_db.journal_db().backing());
// blocks [ 3a(c) 2a(c) 2b 1b 1a(c) 0 ]
// balance [ 5 5 4 3 2 2 ]
let mut s = state_db.boxed_clone_canon(&root_parent);
s.add_to_account_cache(address, Some(Account::new_basic(2.into(), 0.into())), false);
s.commit(&mut batch, 0, &h0, None).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.commit(&mut batch, 1, &h1a, None).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h0);
s.add_to_account_cache(address, Some(Account::new_basic(3.into(), 0.into())), true);
s.commit(&mut batch, 1, &h1b, None).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1b);
s.add_to_account_cache(address, Some(Account::new_basic(4.into(), 0.into())), true);
s.commit(&mut batch, 2, &h2b, None).unwrap();
s.sync_cache(&[], &[], false);
let mut s = state_db.boxed_clone_canon(&h1a);
s.add_to_account_cache(address, Some(Account::new_basic(5.into(), 0.into())), true);
s.commit(&mut batch, 2, &h2a, None).unwrap();
s.sync_cache(&[], &[], true);
let mut s = state_db.boxed_clone_canon(&h2a);
s.commit(&mut batch, 3, &h3a, None).unwrap();
s.sync_cache(&[], &[], true);
let s = state_db.boxed_clone_canon(&h3a);
assert_eq!(s.get_cached_account(&address).unwrap().unwrap().balance(), &U256::from(5));
let s = state_db.boxed_clone_canon(&h1a);
assert!(s.get_cached_account(&address).is_none());
let s = state_db.boxed_clone_canon(&h2b);
assert!(s.get_cached_account(&address).is_none());
let s = state_db.boxed_clone_canon(&h1b);
assert!(s.get_cached_account(&address).is_none());
// reorg to 3b
// blocks [ 3b(c) 3a 2a 2b(c) 1b 1a 0 ]
let mut s = state_db.boxed_clone_canon(&h2b);
s.commit(&mut batch, 3, &h3b, None).unwrap();
s.sync_cache(&[h1b.clone(), h2b.clone(), h3b.clone()], &[h1a.clone(), h2a.clone(), h3a.clone()], true);
let s = state_db.boxed_clone_canon(&h3a);
assert!(s.get_cached_account(&address).is_none());
}
}

View File

@@ -57,7 +57,11 @@ fn should_return_registrar() {
IoChannel::disconnected(),
&db_config
).unwrap();
assert_eq!(client.additional_params().get("registrar"), Some(&"52dff57a8a1532e6afb3dc07e2af58bb9eb05b3d".to_owned()));
let params = client.additional_params();
let address = params.get("registrar").unwrap();
assert_eq!(address.len(), 40);
assert!(U256::from_str(address).is_ok());
}
#[test]

View File

@@ -16,4 +16,5 @@
pub mod helpers;
mod client;
#[cfg(feature="ipc")]
mod rpc;

View File

@@ -19,7 +19,8 @@
use nanoipc;
use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool};
use client::{Client, BlockChainClient, ClientConfig, RemoteClient, BlockID};
use client::{Client, BlockChainClient, ClientConfig, BlockID};
use client::remote::RemoteClient;
use tests::helpers::*;
use devtools::*;
use miner::Miner;

View File

@@ -256,16 +256,6 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
return;
}
// at first, let's insert new block traces
{
let mut traces = self.traces.write();
// it's important to use overwrite here,
// cause this value might be queried by hash later
batch.write_with_cache(db::COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
}
// now let's rebuild the blooms
if !request.enacted.is_empty() {
let range_start = request.block_number as Number + 1 - request.enacted.len();
@@ -276,12 +266,25 @@ impl<T> TraceDatabase for TraceDB<T> where T: DatabaseExtras {
// all traces are expected to be found here. That's why `expect` has been used
// instead of `filter_map`. If some traces haven't been found, it meens that
// traces database is corrupted or incomplete.
.map(|block_hash| self.traces(block_hash).expect("Traces database is incomplete."))
.map(|block_traces| block_traces.bloom())
.map(|block_hash| if block_hash == &request.block_hash {
request.traces.bloom()
} else {
self.traces(block_hash).expect("Traces database is incomplete.").bloom()
})
.map(blooms::Bloom::from)
.map(Into::into)
.collect();
// insert new block traces into the cache and the database
{
let mut traces = self.traces.write();
// it's important to use overwrite here,
// cause this value might be queried by hash later
batch.write_with_cache(db::COL_TRACE, &mut *traces, request.block_hash, request.traces, CacheUpdatePolicy::Overwrite);
// note_used must be called after locking traces to avoid cache/traces deadlock on garbage collection
self.note_used(CacheID::Trace(request.block_hash.clone()));
}
let chain = BloomGroupChain::new(self.bloom_config, self);
let trace_blooms = chain.replace(&replaced_range, enacted_blooms);
let blooms_to_insert = trace_blooms.into_iter()

View File

@@ -22,7 +22,7 @@ use client::BlockID;
use log_entry::LogEntry;
/// Blockchain Filter.
#[derive(Binary)]
#[derive(Binary, Debug, PartialEq)]
pub struct Filter {
/// Blockchain will be searched from this block.
pub from_block: BlockID,