diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 8e93defcf..85dbc6bbc 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -34,6 +34,9 @@ pub trait MinerService { fn import_transactions(&self, transactions: Vec, fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256; + /// Returns hashes of transactions currently in pending + fn pending_transactions_hashes(&self) -> Vec; + /// Removes all transactions from the queue and restart mining operation. fn clear_and_reset(&self, chain: &BlockChainClient); @@ -135,6 +138,11 @@ impl MinerService for Miner { transaction_queue.add_all(transactions, fetch_nonce) } + fn pending_transactions_hashes(&self) -> Vec { + let transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.pending_hashes() + } + fn prepare_sealing(&self, chain: &BlockChainClient) { let no_of_transactions = 128; let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions); diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index f64bd7318..4379531b2 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -431,6 +431,14 @@ impl TransactionQueue { .collect() } + /// Returns hashes of all transactions from current, ordered by priority. + pub fn pending_hashes(&self) -> Vec { + self.current.by_priority + .iter() + .map(|t| t.hash) + .collect() + } + /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); @@ -693,6 +701,24 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn should_return_pending_hashes() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + + // when + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); + + // then + let top = txq.pending_hashes(); + assert_eq!(top[0], tx.hash()); + assert_eq!(top[1], tx2.hash()); + assert_eq!(top.len(), 2); + } + #[test] fn should_put_transaction_to_futures_if_gap_detected() { // given diff --git a/parity/main.rs b/parity/main.rs index c73f971d9..d83fe680d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -209,7 +209,7 @@ fn setup_rpc_server(client: Arc, sync: Arc, miner: Arc, "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "eth" => { server.add_delegate(EthClient::new(&client, &sync, &miner).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); + server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate()); } _ => { die!("{}: Invalid API name to be enabled.", api); diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 6c0862633..73b273a8f 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -102,15 +102,19 @@ impl PollManager where T: Timer { self.polls.get(id) } - pub fn set_poll_transactions(&mut self, id: &PollId, transactions: Vec) { + pub fn update_transactions(&mut self, id: &PollId, transactions: Vec) -> Option> { self.prune(); if self.polls.get(id).is_some() { - self.transactions_data.insert(*id, transactions); + self.transactions_data.insert(*id, transactions) + } else { + None } } + // Normal code always replaces transactions + #[cfg(test)] /// Returns last transactions hashes for given poll. - pub fn poll_transactions(&mut self, id: &PollId) -> Option<&Vec> { + pub fn transactions(&mut self, id: &PollId) -> Option<&Vec> { self.prune(); self.transactions_data.get(id) } @@ -175,14 +179,14 @@ mod tests { // given let mut indexer = PollManager::new(); let poll_id = indexer.create_poll(false, 20); - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&poll_id, transactions.clone()); + indexer.update_transactions(&poll_id, transactions.clone()); // then - let txs = indexer.poll_transactions(&poll_id); + let txs = indexer.transactions(&poll_id); assert_eq!(txs.unwrap(), &transactions); } @@ -197,15 +201,15 @@ mod tests { let mut indexer = PollManager::new_with_timer(timer); let poll_id = indexer.create_poll(false, 20); let transactions = vec![H256::from(1), H256::from(2)]; - indexer.set_poll_transactions(&poll_id, transactions.clone()); - assert!(indexer.poll_transactions(&poll_id).is_some()); + indexer.update_transactions(&poll_id, transactions.clone()); + assert!(indexer.transactions(&poll_id).is_some()); // when *time.borrow_mut() = 75; indexer.prune(); // then - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); } @@ -217,12 +221,12 @@ mod tests { let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&poll_id, transactions.clone()); - assert!(indexer.poll_transactions(&poll_id).is_some()); + indexer.update_transactions(&poll_id, transactions.clone()); + assert!(indexer.transactions(&poll_id).is_some()); indexer.remove_poll(&poll_id); // then - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); } #[test] @@ -232,9 +236,9 @@ mod tests { let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&5, transactions.clone()); + indexer.update_transactions(&5, transactions.clone()); // then - assert!(indexer.poll_transactions(&5).is_none()); + assert!(indexer.transactions(&5).is_none()); } } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index a9ee389f8..5c7df574d 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . //! Eth rpc implementation. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak, Mutex, RwLock}; use std::ops::Deref; use ethsync::{EthSync, SyncState}; @@ -264,15 +264,17 @@ impl Eth for EthClient { /// Eth filter rpc implementation. pub struct EthFilterClient { client: Weak, + miner: Weak, polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. - pub fn new(client: &Arc) -> Self { + pub fn new(client: &Arc, miner: &Arc) -> Self { EthFilterClient { client: Arc::downgrade(client), - polls: Mutex::new(PollManager::new()) + miner: Arc::downgrade(miner), + polls: Mutex::new(PollManager::new()), } } } @@ -302,7 +304,12 @@ impl EthFilter for EthFilterClient { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + let best_block_number = take_weak!(self.client).chain_info().best_block_number; + let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); + + let id = polls.create_poll(PollFilter::PendingTransaction, best_block_number); + polls.update_transactions(&id, pending_transactions); + to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -330,8 +337,21 @@ impl EthFilter for EthFilterClient { to_value(&hashes) }, PollFilter::PendingTransaction => { - // TODO: fix implementation once TransactionQueue is merged - to_value(&vec![] as &Vec) + let poll_id = index.value(); + let mut polls = self.polls.lock().unwrap(); + + let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); + let previous_hashes = polls.update_transactions(&poll_id, current_hashes.clone()).unwrap(); + polls.update_poll(&poll_id, client.chain_info().best_block_number); + + // calculate diff + let previous_hashes_set = previous_hashes.into_iter().collect::>(); + let diff = current_hashes + .into_iter() + .filter(|hash| previous_hashes_set.contains(&hash)) + .collect::>(); + + to_value(&diff) }, PollFilter::Logs(mut filter) => { filter.from_block = BlockId::Number(info.block_number);