Storing BlockNumber & transactions directly in enum

This commit is contained in:
Tomasz Drwięga 2016-03-11 12:31:42 +01:00
parent 2fd036b073
commit dd2fb4df67
3 changed files with 59 additions and 178 deletions

View File

@ -1,10 +1,13 @@
//! Helper type with all filter possibilities. //! Helper type with all filter possibilities.
use util::hash::H256;
use ethcore::filter::Filter; use ethcore::filter::Filter;
pub type BlockNumber = u64;
#[derive(Clone)] #[derive(Clone)]
pub enum PollFilter { pub enum PollFilter {
Block, Block(BlockNumber),
PendingTransaction, PendingTransaction(Vec<H256>),
Logs(Filter) Logs(BlockNumber, Filter)
} }

View File

@ -16,36 +16,18 @@
//! Indexes all rpc poll requests. //! Indexes all rpc poll requests.
use util::hash::H256;
use std::collections::HashMap;
use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; use transient_hashmap::{TransientHashMap, Timer, StandardTimer};
/// Lifetime of poll (in seconds). /// Lifetime of poll (in seconds).
const POLL_LIFETIME: u64 = 60; const POLL_LIFETIME: u64 = 60;
pub type PollId = usize; pub type PollId = usize;
pub type BlockNumber = u64;
pub struct PollInfo<F> {
pub filter: F,
pub block_number: BlockNumber
}
impl<F> Clone for PollInfo<F> where F: Clone {
fn clone(&self) -> Self {
PollInfo {
filter: self.filter.clone(),
block_number: self.block_number.clone()
}
}
}
/// Indexes all poll requests. /// Indexes all poll requests.
/// ///
/// Lazily garbage collects unused polls info. /// Lazily garbage collects unused polls info.
pub struct PollManager<F, T = StandardTimer> where T: Timer { pub struct PollManager<F, T = StandardTimer> where T: Timer {
polls: TransientHashMap<PollId, PollInfo<F>, T>, polls: TransientHashMap<PollId, F, T>,
transactions_data: HashMap<PollId, Vec<H256>>,
next_available_id: PollId, next_available_id: PollId,
} }
@ -57,188 +39,89 @@ impl<F> PollManager<F, StandardTimer> {
} }
impl<F, T> PollManager<F, T> where T: Timer { impl<F, T> PollManager<F, T> where T: Timer {
pub fn new_with_timer(timer: T) -> Self { pub fn new_with_timer(timer: T) -> Self {
PollManager { PollManager {
polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer),
transactions_data: HashMap::new(),
next_available_id: 0, next_available_id: 0,
} }
} }
fn prune(&mut self) {
self.polls.prune();
// self.polls.prune()
// .into_iter()
// .map(|key| {
// self.transactions_data.remove(key);
// });
}
/// Returns id which can be used for new poll. /// Returns id which can be used for new poll.
/// ///
/// Stores information when last poll happend. /// Stores information when last poll happend.
pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { pub fn create_poll(&mut self, filter: F) -> PollId {
self.prune(); self.polls.prune();
let id = self.next_available_id; let id = self.next_available_id;
self.polls.insert(id, filter);
self.next_available_id += 1; self.next_available_id += 1;
self.polls.insert(id, PollInfo {
filter: filter,
block_number: block,
});
id id
} }
/// Updates information when last poll happend. // Implementation is always using `poll_mut`
pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { #[cfg(test)]
self.prune(); /// Get a reference to stored poll filter
if let Some(info) = self.polls.get_mut(id) { pub fn poll(&mut self, id: &PollId) -> Option<&F> {
info.block_number = block; self.polls.prune();
}
}
/// Returns number of block when last poll happend.
pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo<F>> {
self.prune();
self.polls.get(id) self.polls.get(id)
} }
pub fn update_transactions(&mut self, id: &PollId, transactions: Vec<H256>) -> Option<Vec<H256>> { /// Get a mutable reference to stored poll filter
self.prune(); pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> {
if self.polls.get(id).is_some() { self.polls.prune();
self.transactions_data.insert(*id, transactions) self.polls.get_mut(id)
} else {
None
}
}
// Normal code always replaces transactions
#[cfg(test)]
/// Returns last transactions hashes for given poll.
pub fn transactions(&mut self, id: &PollId) -> Option<&Vec<H256>> {
self.prune();
self.transactions_data.get(id)
} }
/// Removes poll info. /// Removes poll info.
pub fn remove_poll(&mut self, id: &PollId) { pub fn remove_poll(&mut self, id: &PollId) {
self.polls.remove(id); self.polls.remove(id);
self.transactions_data.remove(id);
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::cell::RefCell; use std::cell::Cell;
use transient_hashmap::Timer; use transient_hashmap::Timer;
use v1::helpers::PollManager; use v1::helpers::PollManager;
use util::hash::H256;
struct TestTimer<'a> { struct TestTimer<'a> {
time: &'a RefCell<i64>, time: &'a Cell<i64>,
} }
impl<'a> Timer for TestTimer<'a> { impl<'a> Timer for TestTimer<'a> {
fn get_time(&self) -> i64 { fn get_time(&self) -> i64 {
*self.time.borrow() self.time.get()
} }
} }
#[test] #[test]
fn test_poll_indexer() { fn test_poll_indexer() {
let time = RefCell::new(0); let time = Cell::new(0);
let timer = TestTimer { let timer = TestTimer {
time: &time, time: &time,
}; };
let mut indexer = PollManager::new_with_timer(timer); let mut indexer = PollManager::new_with_timer(timer);
assert_eq!(indexer.create_poll(false, 20), 0); assert_eq!(indexer.create_poll(20), 0);
assert_eq!(indexer.create_poll(true, 20), 1); assert_eq!(indexer.create_poll(20), 1);
*time.borrow_mut() = 10; time.set(10);
indexer.update_poll(&0, 21); *indexer.poll_mut(&0).unwrap() = 21;
assert_eq!(indexer.poll_info(&0).unwrap().filter, false); assert_eq!(*indexer.poll(&0).unwrap(), 21);
assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); assert_eq!(*indexer.poll(&1).unwrap(), 20);
*time.borrow_mut() = 30; time.set(30);
indexer.update_poll(&1, 23); *indexer.poll_mut(&1).unwrap() = 23;
assert_eq!(indexer.poll_info(&1).unwrap().filter, true); assert_eq!(*indexer.poll(&1).unwrap(), 23);
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
*time.borrow_mut() = 75; time.set(75);
indexer.update_poll(&0, 30); assert!(indexer.poll(&0).is_none());
assert!(indexer.poll_info(&0).is_none()); assert_eq!(*indexer.poll(&1).unwrap(), 23);
assert_eq!(indexer.poll_info(&1).unwrap().filter, true);
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
indexer.remove_poll(&1); indexer.remove_poll(&1);
assert!(indexer.poll_info(&1).is_none()); assert!(indexer.poll(&1).is_none());
} }
#[test]
fn should_return_poll_transactions_hashes() {
// given
let mut indexer = PollManager::new();
let poll_id = indexer.create_poll(false, 20);
assert!(indexer.transactions(&poll_id).is_none());
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.update_transactions(&poll_id, transactions.clone());
// then
let txs = indexer.transactions(&poll_id);
assert_eq!(txs.unwrap(), &transactions);
}
#[test]
fn should_remove_transaction_data_when_poll_timed_out() {
// given
let time = RefCell::new(0);
let timer = TestTimer {
time: &time,
};
let mut indexer = PollManager::new_with_timer(timer);
let poll_id = indexer.create_poll(false, 20);
let transactions = vec![H256::from(1), H256::from(2)];
indexer.update_transactions(&poll_id, transactions.clone());
assert!(indexer.transactions(&poll_id).is_some());
// when
*time.borrow_mut() = 75;
indexer.prune();
// then
assert!(indexer.transactions(&poll_id).is_none());
}
#[test]
fn should_remove_transaction_data_when_poll_is_removed() {
// given
let mut indexer = PollManager::new();
let poll_id = indexer.create_poll(false, 20);
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.update_transactions(&poll_id, transactions.clone());
assert!(indexer.transactions(&poll_id).is_some());
indexer.remove_poll(&poll_id);
// then
assert!(indexer.transactions(&poll_id).is_none());
}
#[test]
fn should_ignore_transactions_for_invalid_poll_id() {
// given
let mut indexer = PollManager::<()>::new();
let transactions = vec![H256::from(1), H256::from(2)];
// when
indexer.update_transactions(&5, transactions.clone());
// then
assert!(indexer.transactions(&5).is_none());
}
} }

View File

@ -301,7 +301,8 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
from_params::<(Filter,)>(params) from_params::<(Filter,)>(params)
.and_then(|(filter,)| { .and_then(|(filter,)| {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); let block_number = take_weak!(self.client).chain_info().best_block_number;
let id = polls.create_poll(PollFilter::Logs(block_number, filter.into()));
to_value(&U256::from(id)) to_value(&U256::from(id))
}) })
} }
@ -310,7 +311,7 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
match params { match params {
Params::None => { Params::None => {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number));
to_value(&U256::from(id)) to_value(&U256::from(id))
}, },
_ => Err(Error::invalid_params()) _ => Err(Error::invalid_params())
@ -321,11 +322,8 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
match params { match params {
Params::None => { Params::None => {
let mut polls = self.polls.lock().unwrap(); let mut polls = self.polls.lock().unwrap();
let best_block_number = take_weak!(self.client).chain_info().best_block_number;
let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); let pending_transactions = take_weak!(self.miner).pending_transactions_hashes();
let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions));
let id = polls.create_poll(PollFilter::PendingTransaction, best_block_number);
polls.update_transactions(&id, pending_transactions);
to_value(&U256::from(id)) to_value(&U256::from(id))
}, },
@ -337,50 +335,47 @@ impl<C, M> EthFilter for EthFilterClient<C, M>
let client = take_weak!(self.client); let client = take_weak!(self.client);
from_params::<(Index,)>(params) from_params::<(Index,)>(params)
.and_then(|(index,)| { .and_then(|(index,)| {
let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); let mut polls = self.polls.lock().unwrap();
match info { match polls.poll_mut(&index.value()) {
None => Ok(Value::Array(vec![] as Vec<Value>)), None => Ok(Value::Array(vec![] as Vec<Value>)),
Some(info) => match info.filter { Some(filter) => match *filter {
PollFilter::Block => { PollFilter::Block(ref mut block_number) => {
// + 1, cause we want to return hashes including current block hash. // + 1, cause we want to return hashes including current block hash.
let current_number = client.chain_info().best_block_number + 1; let current_number = client.chain_info().best_block_number + 1;
let hashes = (info.block_number..current_number).into_iter() let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number) .map(BlockId::Number)
.filter_map(|id| client.block_hash(id)) .filter_map(|id| client.block_hash(id))
.collect::<Vec<H256>>(); .collect::<Vec<H256>>();
self.polls.lock().unwrap().update_poll(&index.value(), current_number); *block_number = current_number;
to_value(&hashes) to_value(&hashes)
}, },
PollFilter::PendingTransaction => { PollFilter::PendingTransaction(ref mut previous_hashes) => {
let poll_id = index.value();
let mut polls = self.polls.lock().unwrap();
let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); let current_hashes = take_weak!(self.miner).pending_transactions_hashes();
let previous_hashes = polls.update_transactions(&poll_id, current_hashes.clone()).unwrap();
polls.update_poll(&poll_id, client.chain_info().best_block_number);
// calculate diff // calculate diff
let previous_hashes_set = previous_hashes.into_iter().collect::<HashSet<H256>>(); let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::<HashSet<H256>>();
let diff = current_hashes let diff = current_hashes
.into_iter() .iter()
.filter(|hash| previous_hashes_set.contains(&hash)) .filter(|hash| previous_hashes_set.contains(&hash))
.cloned()
.collect::<Vec<H256>>(); .collect::<Vec<H256>>();
*previous_hashes = current_hashes;
to_value(&diff) to_value(&diff)
}, },
PollFilter::Logs(mut filter) => { PollFilter::Logs(ref mut block_number, ref mut filter) => {
filter.from_block = BlockId::Number(info.block_number); filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest; filter.to_block = BlockId::Latest;
let logs = client.logs(filter) let logs = client.logs(filter.clone())
.into_iter() .into_iter()
.map(From::from) .map(From::from)
.collect::<Vec<Log>>(); .collect::<Vec<Log>>();
let current_number = client.chain_info().best_block_number; let current_number = client.chain_info().best_block_number;
self.polls.lock().unwrap().update_poll(&index.value(), current_number);
*block_number = current_number;
to_value(&logs) to_value(&logs)
} }
} }