diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 90ec8693f..c3e8d942e 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -122,7 +122,7 @@ const CLIENT_DB_VER_STR: &'static str = "5.3"; impl Client { /// Create a new client with given spec and DB path. - pub fn new(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Result, Error> { + pub fn new(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Arc { Client::::new_with_verifier(config, spec, path, message_channel) } } @@ -146,7 +146,7 @@ pub fn append_path(path: &Path, item: &str) -> String { impl Client where V: Verifier { /// Create a new client with given spec and DB path and custom verifier. - pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Result>, Error> { + pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Arc> { let path = get_db_path(path, config.pruning, spec.genesis_header().hash()); let gb = spec.genesis_block(); let chain = Arc::new(BlockChain::new(config.blockchain, &gb, &path)); @@ -163,7 +163,7 @@ impl Client where V: Verifier { let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); - Ok(Arc::new(Client { + Arc::new(Client { chain: chain, engine: engine, state_db: Mutex::new(state_db), @@ -172,7 +172,7 @@ impl Client where V: Verifier { import_lock: Mutex::new(()), panic_handler: panic_handler, verifier: PhantomData, - })) + }) } /// Flush the block import queue. diff --git a/ethcore/src/json_tests/chain.rs b/ethcore/src/json_tests/chain.rs index a1154f6a9..9413d025a 100644 --- a/ethcore/src/json_tests/chain.rs +++ b/ethcore/src/json_tests/chain.rs @@ -53,7 +53,7 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec { let temp = RandomTempPath::new(); { - let client = Client::new(ClientConfig::default(), spec, temp.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), spec, temp.as_path(), IoChannel::disconnected()); for b in &blockchain.blocks_rlp() { if Block::is_good(&b) { let _ = client.import_block(b.clone()); diff --git a/ethcore/src/json_tests/executive.rs b/ethcore/src/json_tests/executive.rs index 0825f3c27..427868807 100644 --- a/ethcore/src/json_tests/executive.rs +++ b/ethcore/src/json_tests/executive.rs @@ -17,11 +17,9 @@ use super::test_common::*; use state::*; use executive::*; -use spec::*; use engine::*; use evm; use evm::{Schedule, Ext, Factory, VMType, ContractCreateResult, MessageCallResult}; -use ethereum; use externalities::*; use substate::*; use tests::helpers::*; diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 38bd873b8..2464b7cc6 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -61,7 +61,7 @@ impl ClientService { info!("Starting {}", net_service.host_info()); info!("Configured for {} using {:?} engine", spec.name, spec.engine.name()); - let client = try!(Client::new(config, spec, db_path, net_service.io().channel())); + let client = Client::new(config, spec, db_path, net_service.io().channel()); panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { client: client.clone() diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 539a33d10..926009ac4 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -20,17 +20,10 @@ use tests::helpers::*; use common::*; use devtools::*; -#[test] -fn created() { - let dir = RandomTempPath::new(); - let client_result = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); - assert!(client_result.is_ok()); -} - #[test] fn imports_from_empty() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); client.import_verified_blocks(&IoChannel::disconnected()); client.flush_queue(); } @@ -48,7 +41,7 @@ fn returns_state_root_basic() { #[test] fn imports_good_block() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); let good_block = get_good_dummy_block(); if let Err(_) = client.import_block(good_block) { panic!("error importing block being good by definition"); @@ -63,7 +56,7 @@ fn imports_good_block() { #[test] fn query_none_block() { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); let non_existant = client.block_header(BlockId::Number(188)); assert!(non_existant.is_none()); diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 152ac0ef6..56e33d76b 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -145,7 +145,7 @@ pub fn create_test_block_with_data(header: &Header, transactions: &[&SignedTrans pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult> { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); let test_spec = get_test_spec(); let test_engine = &test_spec.engine; let state_root = test_spec.genesis_header().state_root; @@ -211,7 +211,7 @@ pub fn push_blocks_to_client(client: &Arc, timestamp_salt: u64, starting pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult> { let dir = RandomTempPath::new(); - let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); + let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()); for block in &blocks { if let Err(_) = client.import_block(block.clone()) { panic!("panic importing block which is well-formed"); diff --git a/miner/src/lib.rs b/miner/src/lib.rs index e99f9217d..bcfad253e 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -38,7 +38,7 @@ //! fn main() { //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let dir = env::temp_dir(); -//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()); //! //! let miner: Miner = Miner::default(); //! // get status @@ -100,6 +100,12 @@ pub trait MinerService : Send + Sync { /// Set the gas limit we wish to target when sealing a new block. fn set_gas_floor_target(&self, target: U256); + /// Get current transactions limit in queue. + fn transactions_limit(&self) -> usize; + + /// Set maximal number of transactions kept in the queue (both current and future). + fn set_transactions_limit(&self, limit: usize); + /// Imports transactions to transaction queue. fn import_transactions(&self, transactions: Vec, fetch_account: T) -> Vec> diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 287250f04..a45b34f85 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -205,6 +205,14 @@ impl MinerService for Miner { *self.gas_floor_target.read().unwrap() / x!(5) } + fn transactions_limit(&self) -> usize { + self.transaction_queue.lock().unwrap().limit() + } + + fn set_transactions_limit(&self, limit: usize) { + self.transaction_queue.lock().unwrap().set_limit(limit) + } + /// Get the author that we will seal blocks as. fn author(&self) -> Address { *self.author.read().unwrap() diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index 6a1721f1d..e9f2570e3 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -86,7 +86,8 @@ use std::default::Default; use std::cmp::{Ordering}; -use std::collections::{HashMap, HashSet, BTreeSet}; +use std::cmp; +use std::collections::{HashMap, BTreeSet}; use util::numbers::{Uint, U256}; use util::hash::{Address, H256}; use util::table::*; @@ -204,8 +205,8 @@ impl TransactionSet { /// Remove low priority transactions if there is more then specified by given `limit`. /// /// It drops transactions from this set but also removes associated `VerifiedTransaction`. - /// Returns hashes of transactions removed because of limit. - fn enforce_limit(&mut self, by_hash: &mut HashMap) -> Option> { + /// Returns addresses and highes nonces of transactions removed because of limit. + fn enforce_limit(&mut self, by_hash: &mut HashMap) -> Option> { let len = self.by_priority.len(); if len <= self.limit { return None; @@ -221,17 +222,18 @@ impl TransactionSet { .collect() }; - Some(to_drop - .into_iter() - .map(|(sender, nonce)| { + Some(to_drop.into_iter() + .fold(HashMap::new(), |mut removed, (sender, nonce)| { let order = self.drop(&sender, &nonce) .expect("Transaction has just been found in `by_priority`; so it is in `by_address` also."); by_hash.remove(&order.hash) .expect("Hash found in `by_priorty` matches the one dropped; so it is included in `by_hash`"); - order.hash - }) - .collect::>()) + + let max = removed.get(&sender).map(|val| cmp::max(*val, nonce)).unwrap_or(nonce); + removed.insert(sender, max); + removed + })) } /// Drop transaction from this set (remove from `by_priority` and `by_address`) @@ -248,6 +250,12 @@ impl TransactionSet { self.by_priority.clear(); self.by_address.clear(); } + + /// Sets new limit for number of transactions in this `TransactionSet`. + /// Note the limit is not applied (no transactions are removed) by calling this method. + fn set_limit(&mut self, limit: usize) { + self.limit = limit; + } } #[derive(Debug)] @@ -305,20 +313,21 @@ impl Default for TransactionQueue { impl TransactionQueue { /// Creates new instance of this Queue pub fn new() -> Self { - Self::with_limits(1024, 1024) + Self::with_limit(1024) } /// Create new instance of this Queue with specified limits - pub fn with_limits(current_limit: usize, future_limit: usize) -> Self { + pub fn with_limit(limit: usize) -> Self { let current = TransactionSet { by_priority: BTreeSet::new(), by_address: Table::new(), - limit: current_limit, + limit: limit, }; + let future = TransactionSet { by_priority: BTreeSet::new(), by_address: Table::new(), - limit: future_limit, + limit: limit, }; TransactionQueue { @@ -331,6 +340,20 @@ impl TransactionQueue { } } + /// Set the new limit for `current` and `future` queue. + pub fn set_limit(&mut self, limit: usize) { + self.current.set_limit(limit); + self.future.set_limit(limit); + // And ensure the limits + self.current.enforce_limit(&mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash); + } + + /// Returns current limit of transactions in the queue. + pub fn limit(&self) -> usize { + self.current.limit + } + /// Get the minimal gas price. pub fn minimal_gas_price(&self) -> &U256 { &self.minimal_gas_price @@ -595,7 +618,6 @@ impl TransactionQueue { let address = tx.sender(); let nonce = tx.nonce(); - let hash = tx.hash(); let next_nonce = self.last_nonces .get(&address) @@ -606,7 +628,7 @@ impl TransactionQueue { if nonce > next_nonce { // We have a gap - put to future try!(check_too_cheap(Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash))); - try!(check_if_removed(&hash, self.future.enforce_limit(&mut self.by_hash))); + try!(check_if_removed(&address, &nonce, self.future.enforce_limit(&mut self.by_hash))); return Ok(TransactionImportResult::Future); } else if nonce < state_nonce { // Droping transaction @@ -628,13 +650,31 @@ impl TransactionQueue { let future_tx = self.by_hash.remove(&order.hash).unwrap(); try!(check_too_cheap(Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); } + // Also enforce the limit - try!(check_if_removed(&hash, self.current.enforce_limit(&mut self.by_hash))); + let removed = self.current.enforce_limit(&mut self.by_hash); + // If some transaction were removed because of limit we need to update last_nonces also. + self.update_last_nonces(&removed); + // Trigger error if we were removed. + try!(check_if_removed(&address, &nonce, removed)); trace!(target: "miner", "status: {:?}", self.status()); Ok(TransactionImportResult::Current) } + /// Updates + fn update_last_nonces(&mut self, removed_max_nonces: &Option>) { + if let Some(ref max_nonces) = *removed_max_nonces { + for (sender, nonce) in max_nonces.iter() { + if *nonce == U256::zero() { + self.last_nonces.remove(sender); + } else { + self.last_nonces.insert(*sender, *nonce - U256::one()); + } + } + } + } + /// Replaces transaction in given set (could be `future` or `current`). /// /// If there is already transaction with same `(sender, nonce)` it will be replaced iff `gas_price` is higher. @@ -679,12 +719,15 @@ fn check_too_cheap(is_in: bool) -> Result<(), TransactionError> { } } -fn check_if_removed(hash: &H256, dropped: Option>) -> Result<(), TransactionError> { +fn check_if_removed(sender: &Address, nonce: &U256, dropped: Option>) -> Result<(), TransactionError> { match dropped { - Some(ref dropped) if dropped.contains(hash) => { - Err(TransactionError::LimitReached) + Some(ref dropped) => match dropped.get(sender) { + Some(max) if nonce <= max => { + Err(TransactionError::LimitReached) + }, + _ => Ok(()), }, - _ => Ok(()) + _ => Ok(()), } } @@ -1155,8 +1198,10 @@ mod test { #[test] fn should_drop_old_transactions_when_hitting_the_limit() { // given - let mut txq = TransactionQueue::with_limits(1, 1); + let mut txq = TransactionQueue::with_limit(1); let (tx, tx2) = new_txs(U256::one()); + let sender = tx.sender().unwrap(); + let nonce = tx.nonce; txq.add(tx.clone(), &default_nonce).unwrap(); assert_eq!(txq.status().pending, 1); @@ -1169,11 +1214,35 @@ mod test { assert_eq!(txq.status().pending, 1); assert_eq!(t.len(), 1); assert_eq!(t[0], tx); + assert_eq!(txq.last_nonce(&sender), Some(nonce)); + } + + #[test] + fn should_return_correct_nonces_when_dropped_because_of_limit() { + // given + let mut txq = TransactionQueue::with_limit(2); + let tx = new_tx(); + let (tx1, tx2) = new_txs(U256::one()); + let sender = tx1.sender().unwrap(); + let nonce = tx1.nonce; + txq.add(tx1.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); + assert_eq!(txq.status().pending, 2); + assert_eq!(txq.last_nonce(&sender), Some(nonce + U256::one())); + + // when + let res = txq.add(tx.clone(), &default_nonce); + + // then + assert_eq!(res.unwrap(), TransactionImportResult::Current); + assert_eq!(txq.status().pending, 2); + assert_eq!(txq.last_nonce(&sender), Some(nonce)); } #[test] fn should_limit_future_transactions() { - let mut txq = TransactionQueue::with_limits(10, 1); + let mut txq = TransactionQueue::with_limit(1); + txq.current.set_limit(10); let (tx1, tx2) = new_txs_with_gas_price_diff(U256::from(4), U256::from(1)); let (tx3, tx4) = new_txs_with_gas_price_diff(U256::from(4), U256::from(2)); txq.add(tx1.clone(), &default_nonce).unwrap(); diff --git a/parity/main.rs b/parity/main.rs index e5e1da864..dff7ceaeb 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -172,6 +172,8 @@ Sealing/Mining Options: [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. --extra-data STRING Specify a custom extra-data for authored blocks, no more than 32 characters. + --tx-limit LIMIT Limit of transactions kept in the queue (waiting to + be included in next block) [default: 1024]. Footprint Options: --pruning METHOD Configure pruning of the state/storage trie. METHOD @@ -259,6 +261,7 @@ struct Args { flag_usd_per_eth: String, flag_gas_floor_target: String, flag_extra_data: Option, + flag_tx_limit: usize, flag_logging: Option, flag_version: bool, // geth-compatibility... @@ -727,6 +730,7 @@ impl Configuration { miner.set_gas_floor_target(self.gas_floor_target()); miner.set_extra_data(self.extra_data()); miner.set_minimal_gas_price(self.gas_price()); + miner.set_transactions_limit(self.args.flag_tx_limit); // Sync let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone()); diff --git a/rpc/src/v1/impls/ethcore.rs b/rpc/src/v1/impls/ethcore.rs index 36116cfaa..533aad80e 100644 --- a/rpc/src/v1/impls/ethcore.rs +++ b/rpc/src/v1/impls/ethcore.rs @@ -70,6 +70,17 @@ impl Ethcore for EthcoreClient where M: MinerService + 'static { }) } + fn set_transactions_limit(&self, params: Params) -> Result { + from_params::<(usize,)>(params).and_then(|(limit,)| { + take_weak!(self.miner).set_transactions_limit(limit); + to_value(&true) + }) + } + + fn transactions_limit(&self, _: Params) -> Result { + to_value(&take_weak!(self.miner).transactions_limit()) + } + fn min_gas_price(&self, _: Params) -> Result { to_value(&take_weak!(self.miner).minimal_gas_price()) } diff --git a/rpc/src/v1/tests/ethcore.rs b/rpc/src/v1/tests/ethcore.rs index 51542e9dc..43a131bab 100644 --- a/rpc/src/v1/tests/ethcore.rs +++ b/rpc/src/v1/tests/ethcore.rs @@ -153,6 +153,32 @@ fn rpc_ethcore_dev_logs_levels() { let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#; + assert_eq!(io.handle_request(request), Some(response.to_owned())); +} + +fn rpc_ethcore_set_transactions_limit() { + let miner = miner_service(); + let ethcore = EthcoreClient::new(&miner, logger()).to_delegate(); + let io = IoHandler::new(); + io.add_delegate(ethcore); + + let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + + assert_eq!(io.handle_request(request), Some(response.to_owned())); + assert_eq!(miner.transactions_limit(), 10_240_240); +} + + +#[test] +fn rpc_ethcore_transactions_limit() { + let miner = miner_service(); + let ethcore = EthcoreClient::new(&miner, logger()).to_delegate(); + let io = IoHandler::new(); + io.add_delegate(ethcore); + + let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#; assert_eq!(io.handle_request(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 927b8ed50..7b323b198 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -39,6 +39,7 @@ pub struct TestMinerService { gas_floor_target: RwLock, author: RwLock
, extra_data: RwLock, + limit: RwLock, } impl Default for TestMinerService { @@ -52,6 +53,7 @@ impl Default for TestMinerService { gas_floor_target: RwLock::new(U256::from(12345)), author: RwLock::new(Address::zero()), extra_data: RwLock::new(vec![1, 2, 3, 4]), + limit: RwLock::new(1024), } } } @@ -84,6 +86,14 @@ impl MinerService for TestMinerService { *self.min_gas_price.write().unwrap() = min_gas_price; } + fn set_transactions_limit(&self, limit: usize) { + *self.limit.write().unwrap() = limit; + } + + fn transactions_limit(&self) -> usize { + *self.limit.read().unwrap() + } + fn author(&self) -> Address { *self.author.read().unwrap() } diff --git a/rpc/src/v1/traits/ethcore.rs b/rpc/src/v1/traits/ethcore.rs index b8e881408..f00d7bf9a 100644 --- a/rpc/src/v1/traits/ethcore.rs +++ b/rpc/src/v1/traits/ethcore.rs @@ -33,6 +33,12 @@ pub trait Ethcore: Sized + Send + Sync + 'static { /// Sets new author for mined block. fn set_author(&self, _: Params) -> Result { rpc_unimplemented!() } + /// Sets the limits for transaction queue. + fn set_transactions_limit(&self, _: Params) -> Result { rpc_unimplemented!() } + + /// Returns current transactions limit. + fn transactions_limit(&self, _: Params) -> Result { rpc_unimplemented!() } + /// Returns mining extra data. fn extra_data(&self, _: Params) -> Result { rpc_unimplemented!() } @@ -55,12 +61,15 @@ pub trait Ethcore: Sized + Send + Sync + 'static { delegate.add_method("ethcore_setGasFloorTarget", Ethcore::set_gas_floor_target); delegate.add_method("ethcore_setExtraData", Ethcore::set_extra_data); delegate.add_method("ethcore_setAuthor", Ethcore::set_author); + delegate.add_method("ethcore_setTransactionsLimit", Ethcore::set_transactions_limit); delegate.add_method("ethcore_extraData", Ethcore::extra_data); delegate.add_method("ethcore_gasFloorTarget", Ethcore::gas_floor_target); delegate.add_method("ethcore_minGasPrice", Ethcore::min_gas_price); + delegate.add_method("ethcore_transactionsLimit", Ethcore::transactions_limit); delegate.add_method("ethcore_devLogs", Ethcore::dev_logs); delegate.add_method("ethcore_devLogsLevels", Ethcore::dev_logs_levels); + delegate } } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 6e900e97e..0b95f0b92 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -44,7 +44,7 @@ //! fn main() { //! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); //! let dir = env::temp_dir(); -//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()); //! let miner = Miner::new(false); //! EthSync::register(&mut service, SyncConfig::default(), client, miner); //! }