Merge pull request #753 from ethcore/tx_queue_live

Refactoring error transaction_queue error handling and `update_sealing` method.
This commit is contained in:
Gav Wood 2016-03-18 23:54:22 +01:00
commit d16558eb83
7 changed files with 69 additions and 59 deletions

View File

@ -65,6 +65,10 @@ pub enum ExecutionError {
#[derive(Debug)] #[derive(Debug)]
/// Errors concerning transaction processing. /// Errors concerning transaction processing.
pub enum TransactionError { pub enum TransactionError {
/// Transaction is already imported to the queue
AlreadyImported,
/// Transaction is not valid anymore (state already has higher nonce)
Old,
/// Transaction's gas price is below threshold. /// Transaction's gas price is below threshold.
InsufficientGasPrice { InsufficientGasPrice {
/// Minimal expected gas price /// Minimal expected gas price

View File

@ -79,7 +79,7 @@ pub trait MinerService : Send + Sync {
fn status(&self) -> MinerStatus; fn status(&self) -> MinerStatus;
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> Result<(), Error> fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> Vec<Result<(), Error>>
where T: Fn(&Address) -> AccountDetails; where T: Fn(&Address) -> AccountDetails;
/// Returns hashes of transactions currently in pending /// Returns hashes of transactions currently in pending
@ -92,7 +92,7 @@ pub trait MinerService : Send + Sync {
fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]); fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]);
/// New chain head event. Restart mining operation. /// New chain head event. Restart mining operation.
fn prepare_sealing(&self, chain: &BlockChainClient); fn update_sealing(&self, chain: &BlockChainClient);
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>>; fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>>;

View File

@ -95,42 +95,8 @@ impl Miner {
self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price); self.transaction_queue.lock().unwrap().set_minimal_gas_price(min_gas_price);
} }
fn update_gas_limit(&self, chain: &BlockChainClient) { /// Prepares new block for sealing including top transactions from queue.
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit(); pub fn prepare_sealing(&self, chain: &BlockChainClient) {
let mut queue = self.transaction_queue.lock().unwrap();
queue.set_gas_limit(gas_limit);
}
}
impl MinerService for Miner {
fn clear_and_reset(&self, chain: &BlockChainClient) {
self.transaction_queue.lock().unwrap().clear();
self.prepare_sealing(chain);
}
fn status(&self) -> MinerStatus {
let status = self.transaction_queue.lock().unwrap().status();
let block = self.sealing_block.lock().unwrap();
MinerStatus {
transactions_in_pending_queue: status.pending,
transactions_in_future_queue: status.future,
transactions_in_pending_block: block.as_ref().map_or(0, |b| b.transactions().len()),
}
}
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> Result<(), Error>
where T: Fn(&Address) -> AccountDetails {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(transactions, fetch_account)
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
let transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.pending_hashes()
}
fn prepare_sealing(&self, chain: &BlockChainClient) {
let transactions = self.transaction_queue.lock().unwrap().top_transactions(); let transactions = self.transaction_queue.lock().unwrap().top_transactions();
let b = chain.prepare_sealing( let b = chain.prepare_sealing(
self.author(), self.author(),
@ -152,6 +118,47 @@ impl MinerService for Miner {
}); });
} }
fn update_gas_limit(&self, chain: &BlockChainClient) {
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit();
let mut queue = self.transaction_queue.lock().unwrap();
queue.set_gas_limit(gas_limit);
}
}
impl MinerService for Miner {
fn clear_and_reset(&self, chain: &BlockChainClient) {
self.transaction_queue.lock().unwrap().clear();
self.update_sealing(chain);
}
fn status(&self) -> MinerStatus {
let status = self.transaction_queue.lock().unwrap().status();
let block = self.sealing_block.lock().unwrap();
MinerStatus {
transactions_in_pending_queue: status.pending,
transactions_in_future_queue: status.future,
transactions_in_pending_block: block.as_ref().map_or(0, |b| b.transactions().len()),
}
}
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_account: T) -> Vec<Result<(), Error>>
where T: Fn(&Address) -> AccountDetails {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(transactions, fetch_account)
}
fn pending_transactions_hashes(&self) -> Vec<H256> {
let transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.pending_hashes()
}
fn update_sealing(&self, chain: &BlockChainClient) {
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing(chain);
}
}
fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> { fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> {
if self.sealing_block.lock().unwrap().is_none() { if self.sealing_block.lock().unwrap().is_none() {
self.sealing_enabled.store(true, atomic::Ordering::Relaxed); self.sealing_enabled.store(true, atomic::Ordering::Relaxed);
@ -239,9 +246,6 @@ impl MinerService for Miner {
}); });
} }
// Update mined block self.update_sealing(chain);
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
self.prepare_sealing(chain);
}
} }
} }

View File

@ -333,12 +333,12 @@ impl TransactionQueue {
} }
/// Adds all signed transactions to queue to be verified and imported /// Adds all signed transactions to queue to be verified and imported
pub fn add_all<T>(&mut self, txs: Vec<SignedTransaction>, fetch_account: T) -> Result<(), Error> pub fn add_all<T>(&mut self, txs: Vec<SignedTransaction>, fetch_account: T) -> Vec<Result<(), Error>>
where T: Fn(&Address) -> AccountDetails { where T: Fn(&Address) -> AccountDetails {
for tx in txs.into_iter() {
try!(self.add(tx, &fetch_account)); txs.into_iter()
} .map(|tx| self.add(tx, &fetch_account))
Ok(()) .collect()
} }
/// Add signed transaction to queue to be verified and imported /// Add signed transaction to queue to be verified and imported
@ -385,8 +385,7 @@ impl TransactionQueue {
})); }));
} }
self.import_tx(vtx, account.nonce); self.import_tx(vtx, account.nonce).map_err(Error::Transaction)
Ok(())
} }
/// Removes all transactions identified by hashes given in slice /// Removes all transactions identified by hashes given in slice
@ -540,12 +539,14 @@ impl TransactionQueue {
/// ///
/// It ignores transactions that has already been imported (same `hash`) and replaces the transaction /// It ignores transactions that has already been imported (same `hash`) and replaces the transaction
/// iff `(address, nonce)` is the same but `gas_price` is higher. /// iff `(address, nonce)` is the same but `gas_price` is higher.
fn import_tx(&mut self, tx: VerifiedTransaction, state_nonce: U256) { ///
/// Returns `true` when transaction was imported successfuly
fn import_tx(&mut self, tx: VerifiedTransaction, state_nonce: U256) -> Result<(), TransactionError> {
if self.by_hash.get(&tx.hash()).is_some() { if self.by_hash.get(&tx.hash()).is_some() {
// Transaction is already imported. // Transaction is already imported.
trace!(target: "miner", "Dropping already imported transaction: {:?}", tx.hash()); trace!(target: "miner", "Dropping already imported transaction: {:?}", tx.hash());
return; return Err(TransactionError::AlreadyImported);
} }
@ -562,11 +563,11 @@ impl TransactionQueue {
// We have a gap - put to future // We have a gap - put to future
Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash);
self.future.enforce_limit(&mut self.by_hash); self.future.enforce_limit(&mut self.by_hash);
return; return Ok(());
} else if nonce < state_nonce { } else if nonce < state_nonce {
// Droping transaction // Droping transaction
trace!(target: "miner", "Dropping old transaction: {:?} (nonce: {} < {})", tx.hash(), nonce, next_nonce); trace!(target: "miner", "Dropping old transaction: {:?} (nonce: {} < {})", tx.hash(), nonce, next_nonce);
return; return Err(TransactionError::Old);
} }
Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash); Self::replace_transaction(tx, state_nonce, &mut self.current, &mut self.by_hash);
@ -576,6 +577,7 @@ impl TransactionQueue {
self.current.enforce_limit(&mut self.by_hash); self.current.enforce_limit(&mut self.by_hash);
trace!(target: "miner", "status: {:?}", self.status()); trace!(target: "miner", "status: {:?}", self.status());
Ok(())
} }
/// Replaces transaction in given set (could be `future` or `current`). /// Replaces transaction in given set (could be `future` or `current`).
@ -1008,7 +1010,7 @@ mod test {
let fetch_last_nonce = |_a: &Address| AccountDetails{ nonce: last_nonce, balance: !U256::zero() }; let fetch_last_nonce = |_a: &Address| AccountDetails{ nonce: last_nonce, balance: !U256::zero() };
// when // when
txq.add(tx, &fetch_last_nonce).unwrap(); txq.add(tx, &fetch_last_nonce).unwrap_err();
// then // then
let stats = txq.status(); let stats = txq.status();
@ -1028,7 +1030,7 @@ mod test {
assert_eq!(txq.status().pending, 0); assert_eq!(txq.status().pending, 0);
// when // when
txq.add(tx2.clone(), &nonce).unwrap(); txq.add(tx2.clone(), &nonce).unwrap_err();
// then // then
let stats = txq.status(); let stats = txq.status();

View File

@ -385,7 +385,7 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
nonce: client.nonce(a), nonce: client.nonce(a),
balance: client.balance(a), balance: client.balance(a),
}); });
match import { match import.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(_) => to_value(&hash), Ok(_) => to_value(&hash),
Err(e) => { Err(e) => {
warn!("Error sending transaction: {:?}", e); warn!("Error sending transaction: {:?}", e);

View File

@ -48,7 +48,7 @@ impl MinerService for TestMinerService {
} }
/// Imports transactions to transaction queue. /// Imports transactions to transaction queue.
fn import_transactions<T>(&self, _transactions: Vec<SignedTransaction>, _fetch_account: T) -> Result<(), Error> fn import_transactions<T>(&self, _transactions: Vec<SignedTransaction>, _fetch_account: T) -> Vec<Result<(), Error>>
where T: Fn(&Address) -> AccountDetails { unimplemented!(); } where T: Fn(&Address) -> AccountDetails { unimplemented!(); }
/// Returns hashes of transactions currently in pending /// Returns hashes of transactions currently in pending
@ -61,7 +61,7 @@ impl MinerService for TestMinerService {
fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { unimplemented!(); } fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { unimplemented!(); }
/// New chain head event. Restart mining operation. /// New chain head event. Restart mining operation.
fn prepare_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); } fn update_sealing(&self, _chain: &BlockChainClient) { unimplemented!(); }
/// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock.
fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> { fn sealing_block(&self, _chain: &BlockChainClient) -> &Mutex<Option<ClosedBlock>> {

View File

@ -1297,7 +1297,7 @@ impl ChainSync {
} }
pub fn chain_new_head(&mut self, io: &mut SyncIo) { pub fn chain_new_head(&mut self, io: &mut SyncIo) {
self.miner.prepare_sealing(io.chain()); self.miner.update_sealing(io.chain());
} }
} }