From 9741d48496171b387732ca37a2b8f222fbc3983a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 10 Mar 2016 15:35:36 +0100 Subject: [PATCH 1/3] Transaction data associated with polls. --- rpc/src/v1/helpers/poll_manager.rs | 104 +++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 4 deletions(-) diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 36a6352c2..6c0862633 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -16,6 +16,8 @@ //! Indexes all rpc poll requests. +use util::hash::H256; +use std::collections::HashMap; use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; /// Lifetime of poll (in seconds). @@ -43,7 +45,8 @@ impl Clone for PollInfo where F: Clone { /// Lazily garbage collects unused polls info. pub struct PollManager where T: Timer { polls: TransientHashMap, T>, - next_available_id: PollId + transactions_data: HashMap>, + next_available_id: PollId, } impl PollManager { @@ -57,15 +60,25 @@ impl PollManager where T: Timer { pub fn new_with_timer(timer: T) -> Self { PollManager { polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), + transactions_data: HashMap::new(), next_available_id: 0, } } + fn prune(&mut self) { + self.polls.prune(); + // self.polls.prune() + // .into_iter() + // .map(|key| { + // self.transactions_data.remove(key); + // }); + } + /// Returns id which can be used for new poll. /// /// Stores information when last poll happend. pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { - self.polls.prune(); + self.prune(); let id = self.next_available_id; self.next_available_id += 1; self.polls.insert(id, PollInfo { @@ -77,7 +90,7 @@ impl PollManager where T: Timer { /// Updates information when last poll happend. pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { - self.polls.prune(); + self.prune(); if let Some(info) = self.polls.get_mut(id) { info.block_number = block; } @@ -85,13 +98,27 @@ impl PollManager where T: Timer { /// Returns number of block when last poll happend. pub fn get_poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { - self.polls.prune(); + self.prune(); self.polls.get(id) } + pub fn set_poll_transactions(&mut self, id: &PollId, transactions: Vec) { + self.prune(); + if self.polls.get(id).is_some() { + self.transactions_data.insert(*id, transactions); + } + } + + /// Returns last transactions hashes for given poll. + pub fn poll_transactions(&mut self, id: &PollId) -> Option<&Vec> { + self.prune(); + self.transactions_data.get(id) + } + /// Removes poll info. pub fn remove_poll(&mut self, id: &PollId) { self.polls.remove(id); + self.transactions_data.remove(id); } } @@ -100,6 +127,7 @@ mod tests { use std::cell::RefCell; use transient_hashmap::Timer; use v1::helpers::PollManager; + use util::hash::H256; struct TestTimer<'a> { time: &'a RefCell, @@ -141,4 +169,72 @@ mod tests { indexer.remove_poll(&1); assert!(indexer.get_poll_info(&1).is_none()); } + + #[test] + fn should_return_poll_transactions_hashes() { + // given + let mut indexer = PollManager::new(); + let poll_id = indexer.create_poll(false, 20); + assert!(indexer.poll_transactions(&poll_id).is_none()); + let transactions = vec![H256::from(1), H256::from(2)]; + + // when + indexer.set_poll_transactions(&poll_id, transactions.clone()); + + // then + let txs = indexer.poll_transactions(&poll_id); + assert_eq!(txs.unwrap(), &transactions); + } + + + #[test] + fn should_remove_transaction_data_when_poll_timed_out() { + // given + let time = RefCell::new(0); + let timer = TestTimer { + time: &time, + }; + let mut indexer = PollManager::new_with_timer(timer); + let poll_id = indexer.create_poll(false, 20); + let transactions = vec![H256::from(1), H256::from(2)]; + indexer.set_poll_transactions(&poll_id, transactions.clone()); + assert!(indexer.poll_transactions(&poll_id).is_some()); + + // when + *time.borrow_mut() = 75; + indexer.prune(); + + // then + assert!(indexer.poll_transactions(&poll_id).is_none()); + + } + + #[test] + fn should_remove_transaction_data_when_poll_is_removed() { + // given + let mut indexer = PollManager::new(); + let poll_id = indexer.create_poll(false, 20); + let transactions = vec![H256::from(1), H256::from(2)]; + + // when + indexer.set_poll_transactions(&poll_id, transactions.clone()); + assert!(indexer.poll_transactions(&poll_id).is_some()); + indexer.remove_poll(&poll_id); + + // then + assert!(indexer.poll_transactions(&poll_id).is_none()); + } + + #[test] + fn should_ignore_transactions_for_invalid_poll_id() { + // given + let mut indexer = PollManager::<()>::new(); + let transactions = vec![H256::from(1), H256::from(2)]; + + // when + indexer.set_poll_transactions(&5, transactions.clone()); + + // then + assert!(indexer.poll_transactions(&5).is_none()); + } } From c37370a8a777f503307d341381cd00f3fc27ff08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 10 Mar 2016 16:00:55 +0100 Subject: [PATCH 2/3] PendingTransaction filter. --- miner/src/miner.rs | 8 ++++++++ miner/src/transaction_queue.rs | 26 ++++++++++++++++++++++++ parity/main.rs | 2 +- rpc/src/v1/helpers/poll_manager.rs | 32 +++++++++++++++++------------- rpc/src/v1/impls/eth.rs | 32 ++++++++++++++++++++++++------ 5 files changed, 79 insertions(+), 21 deletions(-) diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 8e93defcf..85dbc6bbc 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -34,6 +34,9 @@ pub trait MinerService { fn import_transactions(&self, transactions: Vec, fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256; + /// Returns hashes of transactions currently in pending + fn pending_transactions_hashes(&self) -> Vec; + /// Removes all transactions from the queue and restart mining operation. fn clear_and_reset(&self, chain: &BlockChainClient); @@ -135,6 +138,11 @@ impl MinerService for Miner { transaction_queue.add_all(transactions, fetch_nonce) } + fn pending_transactions_hashes(&self) -> Vec { + let transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.pending_hashes() + } + fn prepare_sealing(&self, chain: &BlockChainClient) { let no_of_transactions = 128; let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions); diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index f64bd7318..4379531b2 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -431,6 +431,14 @@ impl TransactionQueue { .collect() } + /// Returns hashes of all transactions from current, ordered by priority. + pub fn pending_hashes(&self) -> Vec { + self.current.by_priority + .iter() + .map(|t| t.hash) + .collect() + } + /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); @@ -693,6 +701,24 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn should_return_pending_hashes() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + + // when + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); + + // then + let top = txq.pending_hashes(); + assert_eq!(top[0], tx.hash()); + assert_eq!(top[1], tx2.hash()); + assert_eq!(top.len(), 2); + } + #[test] fn should_put_transaction_to_futures_if_gap_detected() { // given diff --git a/parity/main.rs b/parity/main.rs index c73f971d9..d83fe680d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -209,7 +209,7 @@ fn setup_rpc_server(client: Arc, sync: Arc, miner: Arc, "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "eth" => { server.add_delegate(EthClient::new(&client, &sync, &miner).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); + server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate()); } _ => { die!("{}: Invalid API name to be enabled.", api); diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 6c0862633..73b273a8f 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -102,15 +102,19 @@ impl PollManager where T: Timer { self.polls.get(id) } - pub fn set_poll_transactions(&mut self, id: &PollId, transactions: Vec) { + pub fn update_transactions(&mut self, id: &PollId, transactions: Vec) -> Option> { self.prune(); if self.polls.get(id).is_some() { - self.transactions_data.insert(*id, transactions); + self.transactions_data.insert(*id, transactions) + } else { + None } } + // Normal code always replaces transactions + #[cfg(test)] /// Returns last transactions hashes for given poll. - pub fn poll_transactions(&mut self, id: &PollId) -> Option<&Vec> { + pub fn transactions(&mut self, id: &PollId) -> Option<&Vec> { self.prune(); self.transactions_data.get(id) } @@ -175,14 +179,14 @@ mod tests { // given let mut indexer = PollManager::new(); let poll_id = indexer.create_poll(false, 20); - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&poll_id, transactions.clone()); + indexer.update_transactions(&poll_id, transactions.clone()); // then - let txs = indexer.poll_transactions(&poll_id); + let txs = indexer.transactions(&poll_id); assert_eq!(txs.unwrap(), &transactions); } @@ -197,15 +201,15 @@ mod tests { let mut indexer = PollManager::new_with_timer(timer); let poll_id = indexer.create_poll(false, 20); let transactions = vec![H256::from(1), H256::from(2)]; - indexer.set_poll_transactions(&poll_id, transactions.clone()); - assert!(indexer.poll_transactions(&poll_id).is_some()); + indexer.update_transactions(&poll_id, transactions.clone()); + assert!(indexer.transactions(&poll_id).is_some()); // when *time.borrow_mut() = 75; indexer.prune(); // then - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); } @@ -217,12 +221,12 @@ mod tests { let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&poll_id, transactions.clone()); - assert!(indexer.poll_transactions(&poll_id).is_some()); + indexer.update_transactions(&poll_id, transactions.clone()); + assert!(indexer.transactions(&poll_id).is_some()); indexer.remove_poll(&poll_id); // then - assert!(indexer.poll_transactions(&poll_id).is_none()); + assert!(indexer.transactions(&poll_id).is_none()); } #[test] @@ -232,9 +236,9 @@ mod tests { let transactions = vec![H256::from(1), H256::from(2)]; // when - indexer.set_poll_transactions(&5, transactions.clone()); + indexer.update_transactions(&5, transactions.clone()); // then - assert!(indexer.poll_transactions(&5).is_none()); + assert!(indexer.transactions(&5).is_none()); } } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index a9ee389f8..5c7df574d 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . //! Eth rpc implementation. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak, Mutex, RwLock}; use std::ops::Deref; use ethsync::{EthSync, SyncState}; @@ -264,15 +264,17 @@ impl Eth for EthClient { /// Eth filter rpc implementation. pub struct EthFilterClient { client: Weak, + miner: Weak, polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. - pub fn new(client: &Arc) -> Self { + pub fn new(client: &Arc, miner: &Arc) -> Self { EthFilterClient { client: Arc::downgrade(client), - polls: Mutex::new(PollManager::new()) + miner: Arc::downgrade(miner), + polls: Mutex::new(PollManager::new()), } } } @@ -302,7 +304,12 @@ impl EthFilter for EthFilterClient { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + let best_block_number = take_weak!(self.client).chain_info().best_block_number; + let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); + + let id = polls.create_poll(PollFilter::PendingTransaction, best_block_number); + polls.update_transactions(&id, pending_transactions); + to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -330,8 +337,21 @@ impl EthFilter for EthFilterClient { to_value(&hashes) }, PollFilter::PendingTransaction => { - // TODO: fix implementation once TransactionQueue is merged - to_value(&vec![] as &Vec) + let poll_id = index.value(); + let mut polls = self.polls.lock().unwrap(); + + let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); + let previous_hashes = polls.update_transactions(&poll_id, current_hashes.clone()).unwrap(); + polls.update_poll(&poll_id, client.chain_info().best_block_number); + + // calculate diff + let previous_hashes_set = previous_hashes.into_iter().collect::>(); + let diff = current_hashes + .into_iter() + .filter(|hash| previous_hashes_set.contains(&hash)) + .collect::>(); + + to_value(&diff) }, PollFilter::Logs(mut filter) => { filter.from_block = BlockId::Number(info.block_number); From dd2fb4df67307c4ce8e632d7ab009937422888af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 11 Mar 2016 12:31:42 +0100 Subject: [PATCH 3/3] Storing BlockNumber & transactions directly in enum --- rpc/src/v1/helpers/poll_filter.rs | 9 +- rpc/src/v1/helpers/poll_manager.rs | 183 ++++++----------------------- rpc/src/v1/impls/eth.rs | 45 ++++--- 3 files changed, 59 insertions(+), 178 deletions(-) diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index 465290270..f9ed6230c 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -1,10 +1,13 @@ //! Helper type with all filter possibilities. +use util::hash::H256; use ethcore::filter::Filter; +pub type BlockNumber = u64; + #[derive(Clone)] pub enum PollFilter { - Block, - PendingTransaction, - Logs(Filter) + Block(BlockNumber), + PendingTransaction(Vec), + Logs(BlockNumber, Filter) } diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 765410567..9735d7d5d 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -16,36 +16,18 @@ //! Indexes all rpc poll requests. -use util::hash::H256; -use std::collections::HashMap; use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; /// Lifetime of poll (in seconds). const POLL_LIFETIME: u64 = 60; pub type PollId = usize; -pub type BlockNumber = u64; - -pub struct PollInfo { - pub filter: F, - pub block_number: BlockNumber -} - -impl Clone for PollInfo where F: Clone { - fn clone(&self) -> Self { - PollInfo { - filter: self.filter.clone(), - block_number: self.block_number.clone() - } - } -} /// Indexes all poll requests. /// /// Lazily garbage collects unused polls info. pub struct PollManager where T: Timer { - polls: TransientHashMap, T>, - transactions_data: HashMap>, + polls: TransientHashMap, next_available_id: PollId, } @@ -57,188 +39,89 @@ impl PollManager { } impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { PollManager { polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), - transactions_data: HashMap::new(), next_available_id: 0, } } - fn prune(&mut self) { - self.polls.prune(); - // self.polls.prune() - // .into_iter() - // .map(|key| { - // self.transactions_data.remove(key); - // }); - } - /// Returns id which can be used for new poll. /// /// Stores information when last poll happend. - pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { - self.prune(); + pub fn create_poll(&mut self, filter: F) -> PollId { + self.polls.prune(); + let id = self.next_available_id; + self.polls.insert(id, filter); + self.next_available_id += 1; - self.polls.insert(id, PollInfo { - filter: filter, - block_number: block, - }); id } - /// Updates information when last poll happend. - pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { - self.prune(); - if let Some(info) = self.polls.get_mut(id) { - info.block_number = block; - } - } - - /// Returns number of block when last poll happend. - pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { - self.prune(); + // Implementation is always using `poll_mut` + #[cfg(test)] + /// Get a reference to stored poll filter + pub fn poll(&mut self, id: &PollId) -> Option<&F> { + self.polls.prune(); self.polls.get(id) } - pub fn update_transactions(&mut self, id: &PollId, transactions: Vec) -> Option> { - self.prune(); - if self.polls.get(id).is_some() { - self.transactions_data.insert(*id, transactions) - } else { - None - } - } - - // Normal code always replaces transactions - #[cfg(test)] - /// Returns last transactions hashes for given poll. - pub fn transactions(&mut self, id: &PollId) -> Option<&Vec> { - self.prune(); - self.transactions_data.get(id) + /// Get a mutable reference to stored poll filter + pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> { + self.polls.prune(); + self.polls.get_mut(id) } /// Removes poll info. pub fn remove_poll(&mut self, id: &PollId) { self.polls.remove(id); - self.transactions_data.remove(id); } } #[cfg(test)] mod tests { - use std::cell::RefCell; + use std::cell::Cell; use transient_hashmap::Timer; use v1::helpers::PollManager; - use util::hash::H256; struct TestTimer<'a> { - time: &'a RefCell, + time: &'a Cell, } impl<'a> Timer for TestTimer<'a> { fn get_time(&self) -> i64 { - *self.time.borrow() + self.time.get() } } #[test] fn test_poll_indexer() { - let time = RefCell::new(0); + let time = Cell::new(0); let timer = TestTimer { time: &time, }; let mut indexer = PollManager::new_with_timer(timer); - assert_eq!(indexer.create_poll(false, 20), 0); - assert_eq!(indexer.create_poll(true, 20), 1); + assert_eq!(indexer.create_poll(20), 0); + assert_eq!(indexer.create_poll(20), 1); - *time.borrow_mut() = 10; - indexer.update_poll(&0, 21); - assert_eq!(indexer.poll_info(&0).unwrap().filter, false); - assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); + time.set(10); + *indexer.poll_mut(&0).unwrap() = 21; + assert_eq!(*indexer.poll(&0).unwrap(), 21); + assert_eq!(*indexer.poll(&1).unwrap(), 20); - *time.borrow_mut() = 30; - indexer.update_poll(&1, 23); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(30); + *indexer.poll_mut(&1).unwrap() = 23; + assert_eq!(*indexer.poll(&1).unwrap(), 23); - *time.borrow_mut() = 75; - indexer.update_poll(&0, 30); - assert!(indexer.poll_info(&0).is_none()); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(75); + assert!(indexer.poll(&0).is_none()); + assert_eq!(*indexer.poll(&1).unwrap(), 23); indexer.remove_poll(&1); - assert!(indexer.poll_info(&1).is_none()); + assert!(indexer.poll(&1).is_none()); } - #[test] - fn should_return_poll_transactions_hashes() { - // given - let mut indexer = PollManager::new(); - let poll_id = indexer.create_poll(false, 20); - assert!(indexer.transactions(&poll_id).is_none()); - let transactions = vec![H256::from(1), H256::from(2)]; - - // when - indexer.update_transactions(&poll_id, transactions.clone()); - - // then - let txs = indexer.transactions(&poll_id); - assert_eq!(txs.unwrap(), &transactions); - } - - - #[test] - fn should_remove_transaction_data_when_poll_timed_out() { - // given - let time = RefCell::new(0); - let timer = TestTimer { - time: &time, - }; - let mut indexer = PollManager::new_with_timer(timer); - let poll_id = indexer.create_poll(false, 20); - let transactions = vec![H256::from(1), H256::from(2)]; - indexer.update_transactions(&poll_id, transactions.clone()); - assert!(indexer.transactions(&poll_id).is_some()); - - // when - *time.borrow_mut() = 75; - indexer.prune(); - - // then - assert!(indexer.transactions(&poll_id).is_none()); - - } - - #[test] - fn should_remove_transaction_data_when_poll_is_removed() { - // given - let mut indexer = PollManager::new(); - let poll_id = indexer.create_poll(false, 20); - let transactions = vec![H256::from(1), H256::from(2)]; - - // when - indexer.update_transactions(&poll_id, transactions.clone()); - assert!(indexer.transactions(&poll_id).is_some()); - indexer.remove_poll(&poll_id); - - // then - assert!(indexer.transactions(&poll_id).is_none()); - } - - #[test] - fn should_ignore_transactions_for_invalid_poll_id() { - // given - let mut indexer = PollManager::<()>::new(); - let transactions = vec![H256::from(1), H256::from(2)]; - - // when - indexer.update_transactions(&5, transactions.clone()); - - // then - assert!(indexer.transactions(&5).is_none()); - } } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 211c46304..9f81caa90 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -301,7 +301,8 @@ impl EthFilter for EthFilterClient from_params::<(Filter,)>(params) .and_then(|(filter,)| { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + let block_number = take_weak!(self.client).chain_info().best_block_number; + let id = polls.create_poll(PollFilter::Logs(block_number, filter.into())); to_value(&U256::from(id)) }) } @@ -310,7 +311,7 @@ impl EthFilter for EthFilterClient match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -321,11 +322,8 @@ impl EthFilter for EthFilterClient match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let best_block_number = take_weak!(self.client).chain_info().best_block_number; let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); - - let id = polls.create_poll(PollFilter::PendingTransaction, best_block_number); - polls.update_transactions(&id, pending_transactions); + let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); to_value(&U256::from(id)) }, @@ -337,50 +335,47 @@ impl EthFilter for EthFilterClient let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); - match info { + let mut polls = self.polls.lock().unwrap(); + match polls.poll_mut(&index.value()) { None => Ok(Value::Array(vec![] as Vec)), - Some(info) => match info.filter { - PollFilter::Block => { + Some(filter) => match *filter { + PollFilter::Block(ref mut block_number) => { // + 1, cause we want to return hashes including current block hash. let current_number = client.chain_info().best_block_number + 1; - let hashes = (info.block_number..current_number).into_iter() + let hashes = (*block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| client.block_hash(id)) .collect::>(); - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&hashes) }, - PollFilter::PendingTransaction => { - let poll_id = index.value(); - let mut polls = self.polls.lock().unwrap(); - + PollFilter::PendingTransaction(ref mut previous_hashes) => { let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); - let previous_hashes = polls.update_transactions(&poll_id, current_hashes.clone()).unwrap(); - polls.update_poll(&poll_id, client.chain_info().best_block_number); - // calculate diff - let previous_hashes_set = previous_hashes.into_iter().collect::>(); + let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::>(); let diff = current_hashes - .into_iter() + .iter() .filter(|hash| previous_hashes_set.contains(&hash)) + .cloned() .collect::>(); + *previous_hashes = current_hashes; + to_value(&diff) }, - PollFilter::Logs(mut filter) => { - filter.from_block = BlockId::Number(info.block_number); + PollFilter::Logs(ref mut block_number, ref mut filter) => { + filter.from_block = BlockId::Number(*block_number); filter.to_block = BlockId::Latest; - let logs = client.logs(filter) + let logs = client.logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); let current_number = client.chain_info().best_block_number; - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&logs) } }