PendingTransaction filter.

This commit is contained in:
Tomasz Drwięga
2016-03-10 16:00:55 +01:00
parent 9741d48496
commit c37370a8a7
5 changed files with 79 additions and 21 deletions

View File

@@ -102,15 +102,19 @@ impl<F, T> PollManager<F, T> where T: Timer {
self.polls.get(id)
}
pub fn set_poll_transactions(&mut self, id: &PollId, transactions: Vec<H256>) {
pub fn update_transactions(&mut self, id: &PollId, transactions: Vec<H256>) -> Option<Vec<H256>> {
self.prune();
if self.polls.get(id).is_some() {
self.transactions_data.insert(*id, transactions);
self.transactions_data.insert(*id, transactions)
} else {
None
}
}
// Normal code always replaces transactions
#[cfg(test)]
/// Returns last transactions hashes for given poll.
pub fn poll_transactions(&mut self, id: &PollId) -> Option<&Vec<H256>> {
pub fn transactions(&mut self, id: &PollId) -> Option<&Vec<H256>> {
self.prune();
self.transactions_data.get(id)
}
@@ -175,14 +179,14 @@ mod tests {
// given
let mut indexer = PollManager::new();
let poll_id = indexer.create_poll(false, 20);
assert!(indexer.poll_transactions(&poll_id).is_none());
assert!(indexer.transactions(&poll_id).is_none());
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.set_poll_transactions(&poll_id, transactions.clone());
indexer.update_transactions(&poll_id, transactions.clone());
// then
let txs = indexer.poll_transactions(&poll_id);
let txs = indexer.transactions(&poll_id);
assert_eq!(txs.unwrap(), &transactions);
}
@@ -197,15 +201,15 @@ mod tests {
let mut indexer = PollManager::new_with_timer(timer);
let poll_id = indexer.create_poll(false, 20);
let transactions = vec![H256::from(1), H256::from(2)];
indexer.set_poll_transactions(&poll_id, transactions.clone());
assert!(indexer.poll_transactions(&poll_id).is_some());
indexer.update_transactions(&poll_id, transactions.clone());
assert!(indexer.transactions(&poll_id).is_some());
// when
*time.borrow_mut() = 75;
indexer.prune();
// then
assert!(indexer.poll_transactions(&poll_id).is_none());
assert!(indexer.transactions(&poll_id).is_none());
}
@@ -217,12 +221,12 @@ mod tests {
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.set_poll_transactions(&poll_id, transactions.clone());
assert!(indexer.poll_transactions(&poll_id).is_some());
indexer.update_transactions(&poll_id, transactions.clone());
assert!(indexer.transactions(&poll_id).is_some());
indexer.remove_poll(&poll_id);
// then
assert!(indexer.poll_transactions(&poll_id).is_none());
assert!(indexer.transactions(&poll_id).is_none());
}
#[test]
@@ -232,9 +236,9 @@ mod tests {
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.set_poll_transactions(&5, transactions.clone());
indexer.update_transactions(&5, transactions.clone());
// then
assert!(indexer.poll_transactions(&5).is_none());
assert!(indexer.transactions(&5).is_none());
}
}

View File

@@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Eth rpc implementation.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Weak, Mutex, RwLock};
use std::ops::Deref;
use ethsync::{EthSync, SyncState};
@@ -264,15 +264,17 @@ impl Eth for EthClient {
/// Eth filter rpc implementation.
pub struct EthFilterClient {
client: Weak<Client>,
miner: Weak<Miner>,
polls: Mutex<PollManager<PollFilter>>,
}
impl EthFilterClient {
/// Creates new Eth filter client.
pub fn new(client: &Arc<Client>) -> Self {
pub fn new(client: &Arc<Client>, miner: &Arc<Miner>) -> Self {
EthFilterClient {
client: Arc::downgrade(client),
polls: Mutex::new(PollManager::new())
miner: Arc::downgrade(miner),
polls: Mutex::new(PollManager::new()),
}
}
}
@@ -302,7 +304,12 @@ impl EthFilter for EthFilterClient {
match params {
Params::None => {
let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number);
let best_block_number = take_weak!(self.client).chain_info().best_block_number;
let pending_transactions = take_weak!(self.miner).pending_transactions_hashes();
let id = polls.create_poll(PollFilter::PendingTransaction, best_block_number);
polls.update_transactions(&id, pending_transactions);
to_value(&U256::from(id))
},
_ => Err(Error::invalid_params())
@@ -330,8 +337,21 @@ impl EthFilter for EthFilterClient {
to_value(&hashes)
},
PollFilter::PendingTransaction => {
// TODO: fix implementation once TransactionQueue is merged
to_value(&vec![] as &Vec<H256>)
let poll_id = index.value();
let mut polls = self.polls.lock().unwrap();
let current_hashes = take_weak!(self.miner).pending_transactions_hashes();
let previous_hashes = polls.update_transactions(&poll_id, current_hashes.clone()).unwrap();
polls.update_poll(&poll_id, client.chain_info().best_block_number);
// calculate diff
let previous_hashes_set = previous_hashes.into_iter().collect::<HashSet<H256>>();
let diff = current_hashes
.into_iter()
.filter(|hash| previous_hashes_set.contains(&hash))
.collect::<Vec<H256>>();
to_value(&diff)
},
PollFilter::Logs(mut filter) => {
filter.from_block = BlockId::Number(info.block_number);