New mining framework.

Fixes #756.
This commit is contained in:
Gav Wood 2016-03-24 23:03:22 +01:00
parent 3be2763929
commit 830ef7ddfc
10 changed files with 105 additions and 148 deletions

View File

@ -101,6 +101,22 @@ pub struct BlockRefMut<'a> {
pub traces: &'a Option<Vec<Trace>>,
}
/// A set of immutable references to `ExecutedBlock` fields that are publicly accessible.
pub struct BlockRef<'a> {
/// Block header.
pub header: &'a Header,
/// Block transactions.
pub transactions: &'a Vec<SignedTransaction>,
/// Block uncles.
pub uncles: &'a Vec<Header>,
/// Transaction receipts.
pub receipts: &'a Vec<Receipt>,
/// State.
pub state: &'a State,
/// Traces.
pub traces: &'a Option<Vec<Trace>>,
}
impl ExecutedBlock {
/// Create a new block from the given `state`.
fn new(state: State, tracing: bool) -> ExecutedBlock {
@ -114,7 +130,7 @@ impl ExecutedBlock {
}
/// Get a structure containing individual references to all public fields.
pub fn fields(&mut self) -> BlockRefMut {
pub fn fields_mut(&mut self) -> BlockRefMut {
BlockRefMut {
header: &self.base.header,
transactions: &self.base.transactions,
@ -124,6 +140,18 @@ impl ExecutedBlock {
traces: &self.traces,
}
}
/// Get a structure containing individual references to all public fields.
pub fn fields(&self) -> BlockRef {
BlockRef {
header: &self.base.header,
transactions: &self.base.transactions,
uncles: &self.base.uncles,
state: &self.state,
receipts: &self.receipts,
traces: &self.traces,
}
}
}
/// Trait for a object that is_a `ExecutedBlock`.
@ -261,8 +289,8 @@ impl<'x> OpenBlock<'x> {
///
/// If valid, it will be executed, and archived together with the receipt.
pub fn push_transaction(&mut self, t: SignedTransaction, h: Option<H256>) -> Result<&Receipt, Error> {
if self.block.transactions_set.contains(t.hash()) {
return Err(From::from(ExecutionError::AlreadyImported));
if self.block.transactions_set.contains(&t.hash()) {
return Err(From::from(TransactionError::AlreadyImported));
}
let env_info = self.env_info();

View File

@ -426,9 +426,8 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
block.try_seal(self.engine.deref().deref(), seal)
}
// TODO: either work out a better API than this or refactor prepare_sealing and try_seal in terms of this.
fn with_engine<F, T>(&self, f: F) -> T where F: FnOnce(&Engine) -> T {
f(self.engine.deref().deref())
fn engine(&self) -> &Engine {
self.engine.deref().deref()
}
// TODO [todr] Should be moved to miner crate eventually.

View File

@ -40,6 +40,7 @@ use log_entry::LocalizedLogEntry;
use filter::Filter;
use error::{ImportResult, Error};
use receipt::LocalizedReceipt;
use engine::{Engine};
/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send {
@ -130,6 +131,6 @@ pub trait BlockChainClient : Sync + Send {
fn call(&self, t: &SignedTransaction) -> Result<Executed, Error>;
/// Executes a function providing it with a reference to an engine.
fn with_engine<F>(&self, _f: F) where F: FnOnce(&Engine) {}
fn engine(&self) -> &Engine;
}

View File

@ -31,6 +31,7 @@ use block_queue::BlockQueueInfo;
use block::{SealedBlock, ClosedBlock};
use executive::Executed;
use error::Error;
use engine::Engine;
/// Test client.
pub struct TestBlockChainClient {
@ -249,7 +250,7 @@ impl BlockChainClient for TestBlockChainClient {
}
fn prepare_sealing(&self, _author: Address, _gas_floor_target: U256, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> (Option<ClosedBlock>, HashSet<H256>) {
(None, vec![])
(None, HashSet::new())
}
fn try_seal(&self, block: ClosedBlock, _seal: Vec<Bytes>) -> Result<SealedBlock, ClosedBlock> {
@ -404,4 +405,8 @@ impl BlockChainClient for TestBlockChainClient {
best_block_number: self.blocks.read().unwrap().len() as BlockNumber - 1,
}
}
fn engine(&self) -> &Engine {
unimplemented!();
}
}

View File

@ -101,7 +101,7 @@ impl Engine for Ethash {
/// This assumes that all uncles are valid uncles (i.e. of at least one generation before the current).
fn on_close_block(&self, block: &mut ExecutedBlock) {
let reward = self.spec().engine_params.get("blockReward").map_or(U256::from(0u64), |a| decode(&a));
let fields = block.fields();
let fields = block.fields_mut();
// Bestow block reward
fields.state.add_balance(&fields.header.author, &(reward + reward / U256::from(32) * U256::from(fields.uncles.len())));

View File

@ -45,7 +45,7 @@
//! assert_eq!(miner.status().transactions_in_pending_queue, 0);
//!
//! // Check block for sealing
//! assert!(miner.sealing_block(client.deref()).lock().unwrap().is_some());
//! //assert!(miner.sealing_block(client.deref()).lock().unwrap().is_some());
//! }
//! ```
@ -64,7 +64,6 @@ mod transaction_queue;
pub use transaction_queue::{TransactionQueue, AccountDetails};
pub use miner::{Miner};
use std::sync::Mutex;
use util::{H256, Address, FixedHash, Bytes};
use ethcore::client::{BlockChainClient};
use ethcore::block::{ClosedBlock};
@ -99,12 +98,12 @@ pub trait MinerService : Send + Sync {
/// New chain head event. Restart mining operation.
fn update_sealing(&self, chain: &BlockChainClient);
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>>;
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error>;
/// Get the sealing work package and if `Some`, apply some transform.
fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T;
}
/// Mining status

View File

@ -15,92 +15,19 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use rayon::prelude::*;
use std::sync::{Mutex, RwLock, Arc};
use std::sync::atomic;
//use std::sync::{Mutex, RwLock, Arc};
//use std::sync::atomic;
use std::sync::atomic::AtomicBool;
use std::collections::HashSet;
//use std::collections::HashSet;
use util::{H256, U256, Address, Bytes, Uint};
use util::*;//{H256, U256, Address, Bytes, Uint, UsingQueue, HashMap};
use ethcore::views::{BlockView, HeaderView};
use ethcore::client::{BlockChainClient, BlockId};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::error::{Error};
use ethcore::error::*;//{Error};
use ethcore::transaction::SignedTransaction;
use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails};
/// Special ClosedBlock queue-like datastructure that includes the notion of
/// usage to avoid items that were queued but never used from making it into
/// the queue.
struct SealingWork {
/// Not yet being sealed by a miner, but if one asks for work, we'd prefer they do this.
would_seal: Option<ClosedBlock>,
/// Currently being sealed by miners.
being_sealed: Vec<ClosedBlock>,
}
impl SealingWork {
/// Maximum length of the queue.
const MAX_SEALING_BLOCKS_CACHE = 5,
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// it doesn't constitute noting that the item is used.
fn peek_last_ref(&self) -> Option<&ClosedBlock> {
self.would_seal.as_ref().or(self.being_sealed.last().as_ref())
}
/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// this constitutes using the item and will remain in the queue for at least another
/// `MAX_SEALING_BLOCKS_CACHE` invocations of `push()`.
fn use_last_ref(&mut self) -> Option<&ClosedBlock> {
if let Some(x) = self.would_seal.take() {
self.being_sealed.push(x);
if self.being_sealed.len() > MAX_SEALING_BLOCKS_CACHE {
self.being_sealed.erase(0);
}
}
self.being_sealed.last().as_ref()
}
/// Place an item on the end of the queue. The previously `push()`ed item will be removed
/// if `use_last_ref()` since it was `push()`ed.
fn push(&mut self, b: ClosedBlock) {
self.would_seal = Some(b);
}
// Clears everything; the queue is entirely reset.
fn reset(&mut self) {
self.would_seal = None;
self.being_sealed.clear();
}
// Returns `Some` reference to first block that `f` returns `true` with it as a parameter.
// Returns `None` if no such block exists in the queue.
fn find_if<F>(&self, f: F) -> Option<&ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
if would_seal.as_ref().map(&f).unwrap_or(false) {
would_seal.as_ref()
} else {
being_sealed.iter().find_if(f)
}
}
/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
/// a parameter, otherwise `None`.
/// will not destroy a block if a reference to it has previously been returned by `use_last_ref`.
fn pending_if<F>(&mut self, f: F) -> Option<ClosedBlock> where F: Fn(&ClosedBlock) -> bool {
// a bit clumsy - TODO: think about a nicer way of expressing this.
if let Some(x) = self.would_seal.take() {
if f(&x) {
Some(x)
} else {
self.would_seal = x;
None
}
} else {
being_sealed.last().iter().cloned().filter(&b)
}
}
}
/// Keeps track of transactions using priority queue and holds currently mined block.
pub struct Miner {
transaction_queue: Mutex<TransactionQueue>,
@ -108,7 +35,7 @@ pub struct Miner {
// for sealing...
sealing_enabled: AtomicBool,
sealing_block_last_request: Mutex<u64>,
sealing_work: Mutex<SealingWork>,
sealing_work: Mutex<UsingQueue<ClosedBlock>>,
gas_floor_target: RwLock<U256>,
author: RwLock<Address>,
extra_data: RwLock<Bytes>,
@ -120,10 +47,7 @@ impl Default for Miner {
transaction_queue: Mutex::new(TransactionQueue::new()),
sealing_enabled: AtomicBool::new(false),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(SealingWork{
would_seal: None,
being_sealed: vec![],
}),
sealing_work: Mutex::new(UsingQueue::new(5)),
gas_floor_target: RwLock::new(U256::zero()),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
@ -175,8 +99,8 @@ impl Miner {
/// Prepares new block for sealing including top transactions from queue.
fn prepare_sealing(&self, chain: &BlockChainClient) {
let transactions = self.transaction_queue.lock().unwrap().top_transactions();
let sealing_work = self.sealing_work.lock().unwrap();
let best_hash = chain.best_block_header().hash();
let mut sealing_work = self.sealing_work.lock().unwrap();
let best_hash = chain.best_block_header().sha3();
/*
// check to see if last ClosedBlock in would_seals is actually same parent block.
@ -187,39 +111,40 @@ impl Miner {
// otherwise, author a fresh block.
*/
let (b, invalid_transactions) = match sealing_work.pending_if(|b| b.block().fields().header().parent_hash() == best_hash) {
Some(mut old_block) => {
let (b, invalid_transactions) = match sealing_work.pop_if(|b| b.block().fields().header.parent_hash() == &best_hash) {
Some(old_block) => {
// add transactions to old_block
chain.with_engine(|e| {
let invalid_transactions = HashMap::new();
let block = old_block.reopen(e);
let e = chain.engine();
let mut invalid_transactions = HashSet::new();
let mut block = old_block.reopen(e);
let block_number = block.block().fields().header.number();
// TODO: push new uncles, too.
let mut have_one = false;
// TODO: refactor with chain.prepare_sealing
for t in transactions {
let hash = tx.hash();
let res = block.push_transaction(tx, None);
match import {
Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, .. })) => {
trace!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?}", hash);
// Exit early if gas left is smaller then min_tx_gas
if gas_limit - gas_used < min_tx_gas {
break;
}
},
Err(Error::Execution(ExecutionError::AlreadyImported)) => {} // already have transaction - ignore
Err(e) => {
invalid_transactions.insert(hash);
trace!(target: "miner",
"Error adding transaction to block: number={}. transaction_hash={:?}, Error: {:?}",
block_number, hash, e);
},
_ => { have_one = true } // imported ok - note that.
}
// TODO: push new uncles, too.
let mut have_one = false;
// TODO: refactor with chain.prepare_sealing
for tx in transactions {
let hash = tx.hash();
let res = block.push_transaction(tx, None);
match res {
Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, .. })) => {
trace!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?}", hash);
// Exit early if gas left is smaller then min_tx_gas
let min_tx_gas: U256 = x!(21000); // TODO: figure this out properly.
if gas_limit - gas_used < min_tx_gas {
break;
}
},
Err(Error::Transaction(TransactionError::AlreadyImported)) => {} // already have transaction - ignore
Err(e) => {
invalid_transactions.insert(hash);
trace!(target: "miner",
"Error adding transaction to block: number={}. transaction_hash={:?}, Error: {:?}",
block_number, hash, e);
},
_ => { have_one = true } // imported ok - note that.
}
(if have_one { Some(block.close()) } else { None }, invalid_transactions)
})
}
(if have_one { Some(block.close()) } else { None }, invalid_transactions)
}
None => {
// block not found - create it.
@ -230,7 +155,7 @@ impl Miner {
transactions,
)
}
}
};
let mut queue = self.transaction_queue.lock().unwrap();
queue.remove_all(
&invalid_transactions.into_iter().collect::<Vec<H256>>(),
@ -266,7 +191,7 @@ impl MinerService for Miner {
MinerStatus {
transactions_in_pending_queue: status.pending,
transactions_in_future_queue: status.future,
transactions_in_pending_block: block.peek_last_ref().map_or(0, |b| b.transactions().len()),
transactions_in_pending_block: sealing_work.peek_last_ref().map_or(0, |b| b.transactions().len()),
}
}
@ -299,7 +224,7 @@ impl MinerService for Miner {
if should_disable_sealing {
self.sealing_enabled.store(false, atomic::Ordering::Relaxed);
*self.sealing_work.lock().unwrap().reset();
self.sealing_work.lock().unwrap().reset();
} else if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing(chain);
}
@ -316,9 +241,9 @@ impl MinerService for Miner {
}
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
if let Some(b) = self.sealing_work().lock().unwrap().take_if(|b| &b.hash() == &pow_hash) {
match chain.try_seal(b.unwrap(), seal) {
Err(old) => {
if let Some(b) = self.sealing_work.lock().unwrap().take_used_if(|b| &b.hash() == &pow_hash) {
match chain.try_seal(b, seal) {
Err(_) => {
Err(Error::PowInvalid)
}
Ok(sealed) => {

View File

@ -407,15 +407,12 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
}
let miner = take_weak!(self.miner);
miner.map_sealing_work(client.deref(), |b| match b {
Some(b) => {
let pow_hash = b.hash();
let target = Ethash::difficulty_to_boundary(b.block().header().difficulty());
let seed_hash = Ethash::get_seedhash(b.block().header().number());
to_value(&(pow_hash, seed_hash, target))
}
_ => Err(Error::internal_error())
})
miner.map_sealing_work(client.deref(), |b| {
let pow_hash = b.hash();
let target = Ethash::difficulty_to_boundary(b.block().header().difficulty());
let seed_hash = Ethash::get_seedhash(b.block().header().number());
to_value(&(pow_hash, seed_hash, target))
}).unwrap_or(Err(Error::internal_error())) // no work found.
},
_ => Err(Error::invalid_params())
}

View File

@ -68,10 +68,7 @@ impl MinerService for TestMinerService {
/// New chain head event. Restart mining operation.
fn update_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); }
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> {
&self.latest_closed_block
}
fn map_sealing_work<F, T>(&self, _chain: &BlockChainClient, _f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T { unimplemented!(); }
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.

View File

@ -78,6 +78,12 @@ impl<T> UsingQueue<T> where T: Clone {
}
}
/// Returns `Some` item which is the first that `f` returns `true` with a reference to it
/// as a parameter or `None` if no such item exists in the queue.
pub fn take_used_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool {
self.in_use.iter().position(|r| predicate(r)).map(|i| self.in_use.remove(i))
}
/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
/// a parameter, otherwise `None`.
/// Will not destroy a block if a reference to it has previously been returned by `use_last_ref`,