Refactor pending_block to always return invalid txs and sometimes a block.

Docuemnt SealingWork properly.
This commit is contained in:
Gav Wood 2016-03-23 16:28:02 +00:00
parent 97449afbb9
commit 4e013ba2fc
7 changed files with 126 additions and 68 deletions

View File

@ -261,6 +261,10 @@ impl<'x> OpenBlock<'x> {
///
/// If valid, it will be executed, and archived together with the receipt.
pub fn push_transaction(&mut self, t: SignedTransaction, h: Option<H256>) -> Result<&Receipt, Error> {
if self.block.transactions_set.contains(t.hash()) {
return Err(From::from(ExecutionError::AlreadyImported));
}
let env_info = self.env_info();
// info!("env_info says gas_used={}", env_info.gas_used);
match self.block.state.apply(&env_info, self.engine, &t, self.block.traces.is_some()) {

View File

@ -426,17 +426,23 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
block.try_seal(self.engine.deref().deref(), seal)
}
// TODO: either work out a better API than this or refactor prepare_sealing and try_seal in terms of this.
fn with_engine<F, T>(&self, f: F) -> T where F: FnOnce(&Engine) -> T {
f(self.engine.deref().deref())
}
// TODO [todr] Should be moved to miner crate eventually.
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> Option<(ClosedBlock, HashSet<H256>)> {
-> (Option<ClosedBlock>, HashSet<H256>) {
let engine = self.engine.deref().deref();
let h = self.chain.best_block_hash();
let mut invalid_transactions = HashSet::new();
let mut b = OpenBlock::new(
engine,
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.lock().unwrap().spawn(),
match self.chain.block_header(&h) { Some(ref x) => x, None => {return None} },
match self.chain.block_header(&h) { Some(ref x) => x, None => { return (None, invalid_transactions) } },
self.build_last_hashes(h.clone()),
author,
gas_floor_target,
@ -456,7 +462,6 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
// Add transactions
let block_number = b.block().header().number();
let min_tx_gas = U256::from(self.engine.schedule(&b.env_info()).tx_gas);
let mut invalid_transactions = HashSet::new();
for tx in transactions {
// Push transaction to block
@ -488,7 +493,7 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
b.hash(),
b.block().header().difficulty()
);
Some((b, invalid_transactions))
(Some(b), invalid_transactions)
}
fn block_header(&self, id: BlockId) -> Option<Bytes> {

View File

@ -120,7 +120,7 @@ pub trait BlockChainClient : Sync + Send {
// TODO [todr] Should be moved to miner crate eventually.
/// Returns ClosedBlock prepared for sealing.
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> Option<(ClosedBlock, HashSet<H256>)>;
-> (Option<ClosedBlock>, HashSet<H256>);
// TODO [todr] Should be moved to miner crate eventually.
/// Attempts to seal given block. Returns `SealedBlock` on success and the same block in case of error.
@ -128,5 +128,8 @@ pub trait BlockChainClient : Sync + Send {
/// Makes a non-persistent transaction call.
fn call(&self, t: &SignedTransaction) -> Result<Executed, Error>;
/// Executes a function providing it with a reference to an engine.
fn with_engine<F>(&self, _f: F) where F: FnOnce(&Engine) {}
}

View File

@ -248,8 +248,8 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn prepare_sealing(&self, _author: Address, _gas_floor_target: U256, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> Option<(ClosedBlock, HashSet<H256>)> {
None
fn prepare_sealing(&self, _author: Address, _gas_floor_target: U256, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> (Option<ClosedBlock>, HashSet<H256>) {
(None, vec![])
}
fn try_seal(&self, block: ClosedBlock, _seal: Vec<Bytes>) -> Result<SealedBlock, ClosedBlock> {

View File

@ -144,7 +144,7 @@ fn can_mine() {
let client_result = get_test_client_with_blocks(vec![dummy_blocks[0].clone()]);
let client = client_result.reference();
let b = client.prepare_sealing(Address::default(), x!(31415926), vec![], vec![]).unwrap().0;
let b = client.prepare_sealing(Address::default(), x!(31415926), vec![], vec![]).0.unwrap();
assert_eq!(*b.block().header().parent_hash(), BlockView::new(&dummy_blocks[0]).header_view().sha3());
assert!(client.try_seal(b, vec![]).is_ok());

View File

@ -28,6 +28,9 @@ use ethcore::error::{Error};
use ethcore::transaction::SignedTransaction;
use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails};
/// Special ClosedBlock queue-like datastructure that includes the notion of
/// usage to avoid items that were queued but never used from making it into
/// the queue.
struct SealingWork {
/// Not yet being sealed by a miner, but if one asks for work, we'd prefer they do this.
would_seal: Option<ClosedBlock>,
@ -36,22 +39,19 @@ struct SealingWork {
}
impl SealingWork {
// inspect the work that would be given.
fn pending_ref(&self) -> Option<&ClosedBlock> {
/// Maximum length of the queue.
const MAX_SEALING_BLOCKS_CACHE = 5,
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// it doesn't constitute noting that the item is used.
fn peek_last_ref(&self) -> Option<&ClosedBlock> {
self.would_seal.as_ref().or(self.being_sealed.last().as_ref())
}
// return the a reference to forst block that returns true to `f`.
fn find_if<F>(&self, f: F) -> Option<&ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
if would_seal.as_ref().map(&f).unwrap_or(false) {
would_seal.as_ref()
} else {
being_sealed.iter().find_if(f)
}
}
// used for getting the work to be done.
fn use_pending_ref(&mut self) -> Option<&ClosedBlock> {
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// this constitutes using the item and will remain in the queue for at least another
/// `MAX_SEALING_BLOCKS_CACHE` invocations of `push()`.
fn use_last_ref(&mut self) -> Option<&ClosedBlock> {
if let Some(x) = self.would_seal.take() {
self.being_sealed.push(x);
if self.being_sealed.len() > MAX_SEALING_BLOCKS_CACHE {
@ -61,14 +61,32 @@ impl SealingWork {
self.being_sealed.last().as_ref()
}
// set new work to be done.
fn set_pending(&mut self, b: ClosedBlock) {
/// Place an item on the end of the queue. The previously `push()`ed item will be removed
/// if `use_last_ref()` since it was `push()`ed.
fn push(&mut self, b: ClosedBlock) {
self.would_seal = Some(b);
}
// get the pending block if `f(pending)`. if there is no pending block or it doesn't pass `f`, None.
// will not destroy a block if a reference to it has previously been returned by `use_pending_ref`.
fn pending_if<F>(&self, f: F) -> Option<ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
// Clears everything; the queue is entirely reset.
fn reset(&mut self) {
self.would_seal = None;
self.being_sealed.clear();
}
// Returns `Some` reference to first block that `f` returns `true` with it as a parameter.
// Returns `None` if no such block exists in the queue.
fn find_if<F>(&self, f: F) -> Option<&ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
if would_seal.as_ref().map(&f).unwrap_or(false) {
would_seal.as_ref()
} else {
being_sealed.iter().find_if(f)
}
}
/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
/// a parameter, otherwise `None`.
/// will not destroy a block if a reference to it has previously been returned by `use_last_ref`.
fn pending_if<F>(&mut self, f: F) -> Option<ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
// a bit clumsy - TODO: think about a nicer way of expressing this.
if let Some(x) = self.would_seal.take() {
if f(&x) {
@ -78,20 +96,9 @@ impl SealingWork {
None
}
} else {
being_sealed.last().as_ref().filter(&b).map(|b| b.clone())
/* being_sealed.last().as_ref().and_then(|b| if f(b) {
Some(b.clone())
} else {
None
})*/
being_sealed.last().iter().cloned().filter(&b)
}
}
// clears everything.
fn reset(&mut self) {
self.would_seal = None;
self.being_sealed.clear();
}
}
/// Keeps track of transactions using priority queue and holds currently mined block.
@ -108,16 +115,6 @@ pub struct Miner {
}
/*
let sealing_work = self.sealing_work.lock();
// TODO: check to see if last ClosedBlock in would_seals is same.
// if so, duplicate, re-open and push any new transactions.
// if at least one was pushed successfully, close and enqueue new ClosedBlock;
// and remove first ClosedBlock from the queue..
*/
impl Default for Miner {
fn default() -> Miner {
Miner {
@ -179,23 +176,72 @@ impl Miner {
/// Prepares new block for sealing including top transactions from queue.
fn prepare_sealing(&self, chain: &BlockChainClient) {
let transactions = self.transaction_queue.lock().unwrap().top_transactions();
let b = chain.prepare_sealing(
self.author(),
self.gas_floor_target(),
self.extra_data(),
transactions,
);
let sealing_work = self.sealing_work.lock().unwrap();
let best_hash = chain.best_block_header().hash();
if let Some((block, invalid_transactions)) = b {
let mut queue = self.transaction_queue.lock().unwrap();
queue.remove_all(
&invalid_transactions.into_iter().collect::<Vec<H256>>(),
|a: &Address| AccountDetails {
nonce: chain.nonce(a),
balance: chain.balance(a),
}
);
self.sealing_work.lock().unwrap().set_pending(block);
/*
// check to see if last ClosedBlock in would_seals is actually same parent block.
// if so
// duplicate, re-open and push any new transactions.
// if at least one was pushed successfully, close and enqueue new ClosedBlock;
// otherwise, leave everything alone.
// otherwise, author a fresh block.
*/
let (b, invalid_transactions) = match sealing_work.pending_if(|b| b.block().fields().header().parent_hash() == best_hash) {
Some(mut old_block) => {
// add transactions to old_block
chain.with_engine(|e| {
let invalid_transactions = HashMap::new();
let block = old_block.reopen(e);
// TODO: push new uncles, too.
let mut have_one = false;
// TODO: refactor with chain.prepare_sealing
for t in transactions {
let hash = tx.hash();
let res = block.push_transaction(tx, None);
match import {
Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, .. })) => {
trace!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?}", hash);
// Exit early if gas left is smaller then min_tx_gas
if gas_limit - gas_used < min_tx_gas {
break;
}
},
Err(Error::Execution(ExecutionError::AlreadyImported)) => {} // already have transaction - ignore
Err(e) => {
invalid_transactions.insert(hash);
trace!(target: "miner",
"Error adding transaction to block: number={}. transaction_hash={:?}, Error: {:?}",
block_number, hash, e);
},
_ => { have_one = true } // imported ok - note that.
}
}
(if have_one { Some(block.close()) } else { None }, invalid_transactions)
})
}
None => {
// block not found - create it.
chain.prepare_sealing(
self.author(),
self.gas_floor_target(),
self.extra_data(),
transactions,
)
}
}
let mut queue = self.transaction_queue.lock().unwrap();
queue.remove_all(
&invalid_transactions.into_iter().collect::<Vec<H256>>(),
|a: &Address| AccountDetails {
nonce: chain.nonce(a),
balance: chain.balance(a),
}
);
if let Some(block) = b {
self.sealing_work.lock().unwrap().push(block);
}
}
@ -221,7 +267,7 @@ impl MinerService for Miner {
MinerStatus {
transactions_in_pending_queue: status.pending,
transactions_in_future_queue: status.future,
transactions_in_pending_block: block.pending_ref().map_or(0, |b| b.transactions().len()),
transactions_in_pending_block: block.peek_last_ref().map_or(0, |b| b.transactions().len()),
}
}
@ -261,13 +307,13 @@ impl MinerService for Miner {
}
fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
let have_work = self.sealing_work.lock().unwrap().pending_ref().is_none();
let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_none();
if !have_work {
self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
self.prepare_sealing(chain);
}
*self.sealing_block_last_request.lock().unwrap() = chain.chain_info().best_block_number;
self.sealing_work.lock().unwrap().use_pending().map(f)
self.sealing_work.lock().unwrap().use_last_ref().map(f)
}
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {

View File

@ -41,7 +41,7 @@ fn default_gas() -> U256 {
}
fn default_gas_price() -> U256 {
shannon() * U256::from(50)
shannon() * U256::from(20)
}
/// Eth rpc implementation.