diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 135a15df5..10da070a4 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -82,6 +82,9 @@ pub trait MinerService : Send + Sync { fn import_transactions(&self, transactions: Vec, fetch_nonce: T) -> Result<(), Error> where T: Fn(&Address) -> U256; + /// Returns hashes of transactions currently in pending + fn pending_transactions_hashes(&self) -> Vec; + /// Removes all transactions from the queue and restart mining operation. fn clear_and_reset(&self, chain: &BlockChainClient); diff --git a/miner/src/miner.rs b/miner/src/miner.rs index 623af33a0..00efb83d3 100644 --- a/miner/src/miner.rs +++ b/miner/src/miner.rs @@ -25,7 +25,6 @@ use ethcore::client::{BlockChainClient, BlockId}; use ethcore::block::{ClosedBlock}; use ethcore::error::{Error}; use ethcore::transaction::SignedTransaction; - use super::{MinerService, MinerStatus, TransactionQueue}; /// Keeps track of transactions using priority queue and holds currently mined block. @@ -104,6 +103,11 @@ impl MinerService for Miner { transaction_queue.add_all(transactions, fetch_nonce) } + fn pending_transactions_hashes(&self) -> Vec { + let transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.pending_hashes() + } + fn prepare_sealing(&self, chain: &BlockChainClient) { let no_of_transactions = 128; let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions); diff --git a/miner/src/transaction_queue.rs b/miner/src/transaction_queue.rs index 3d5c38b0c..e14b197ae 100644 --- a/miner/src/transaction_queue.rs +++ b/miner/src/transaction_queue.rs @@ -431,6 +431,14 @@ impl TransactionQueue { .collect() } + /// Returns hashes of all transactions from current, ordered by priority. + pub fn pending_hashes(&self) -> Vec { + self.current.by_priority + .iter() + .map(|t| t.hash) + .collect() + } + /// Removes all elements (in any state) from the queue pub fn clear(&mut self) { self.current.clear(); @@ -693,6 +701,24 @@ mod test { assert_eq!(top.len(), 2); } + #[test] + fn should_return_pending_hashes() { + // given + let mut txq = TransactionQueue::new(); + + let (tx, tx2) = new_txs(U256::from(1)); + + // when + txq.add(tx.clone(), &default_nonce).unwrap(); + txq.add(tx2.clone(), &default_nonce).unwrap(); + + // then + let top = txq.pending_hashes(); + assert_eq!(top[0], tx.hash()); + assert_eq!(top[1], tx2.hash()); + assert_eq!(top.len(), 2); + } + #[test] fn should_put_transaction_to_futures_if_gap_detected() { // given diff --git a/parity/main.rs b/parity/main.rs index b2f9fba22..10d59438d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -235,7 +235,7 @@ fn setup_rpc_server( "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "eth" => { server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate()); - server.add_delegate(EthFilterClient::new(&client).to_delegate()); + server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate()); } _ => { die!("{}: Invalid API name to be enabled.", api); diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index 465290270..f9ed6230c 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -1,10 +1,13 @@ //! Helper type with all filter possibilities. +use util::hash::H256; use ethcore::filter::Filter; +pub type BlockNumber = u64; + #[derive(Clone)] pub enum PollFilter { - Block, - PendingTransaction, - Logs(Filter) + Block(BlockNumber), + PendingTransaction(Vec), + Logs(BlockNumber, Filter) } diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs index 0297384d1..9735d7d5d 100644 --- a/rpc/src/v1/helpers/poll_manager.rs +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -22,28 +22,13 @@ use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; const POLL_LIFETIME: u64 = 60; pub type PollId = usize; -pub type BlockNumber = u64; - -pub struct PollInfo { - pub filter: F, - pub block_number: BlockNumber -} - -impl Clone for PollInfo where F: Clone { - fn clone(&self) -> Self { - PollInfo { - filter: self.filter.clone(), - block_number: self.block_number.clone() - } - } -} /// Indexes all poll requests. /// /// Lazily garbage collects unused polls info. pub struct PollManager where T: Timer { - polls: TransientHashMap, T>, - next_available_id: PollId + polls: TransientHashMap, + next_available_id: PollId, } impl PollManager { @@ -54,6 +39,7 @@ impl PollManager { } impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { PollManager { polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), @@ -64,31 +50,30 @@ impl PollManager where T: Timer { /// Returns id which can be used for new poll. /// /// Stores information when last poll happend. - pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { + pub fn create_poll(&mut self, filter: F) -> PollId { self.polls.prune(); + let id = self.next_available_id; + self.polls.insert(id, filter); + self.next_available_id += 1; - self.polls.insert(id, PollInfo { - filter: filter, - block_number: block, - }); id } - /// Updates information when last poll happend. - pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { - self.polls.prune(); - if let Some(info) = self.polls.get_mut(id) { - info.block_number = block; - } - } - - /// Returns number of block when last poll happend. - pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + // Implementation is always using `poll_mut` + #[cfg(test)] + /// Get a reference to stored poll filter + pub fn poll(&mut self, id: &PollId) -> Option<&F> { self.polls.prune(); self.polls.get(id) } + /// Get a mutable reference to stored poll filter + pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> { + self.polls.prune(); + self.polls.get_mut(id) + } + /// Removes poll info. pub fn remove_poll(&mut self, id: &PollId) { self.polls.remove(id); @@ -97,48 +82,46 @@ impl PollManager where T: Timer { #[cfg(test)] mod tests { - use std::cell::RefCell; + use std::cell::Cell; use transient_hashmap::Timer; use v1::helpers::PollManager; struct TestTimer<'a> { - time: &'a RefCell, + time: &'a Cell, } impl<'a> Timer for TestTimer<'a> { fn get_time(&self) -> i64 { - *self.time.borrow() + self.time.get() } } #[test] fn test_poll_indexer() { - let time = RefCell::new(0); + let time = Cell::new(0); let timer = TestTimer { time: &time, }; let mut indexer = PollManager::new_with_timer(timer); - assert_eq!(indexer.create_poll(false, 20), 0); - assert_eq!(indexer.create_poll(true, 20), 1); + assert_eq!(indexer.create_poll(20), 0); + assert_eq!(indexer.create_poll(20), 1); - *time.borrow_mut() = 10; - indexer.update_poll(&0, 21); - assert_eq!(indexer.poll_info(&0).unwrap().filter, false); - assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21); + time.set(10); + *indexer.poll_mut(&0).unwrap() = 21; + assert_eq!(*indexer.poll(&0).unwrap(), 21); + assert_eq!(*indexer.poll(&1).unwrap(), 20); - *time.borrow_mut() = 30; - indexer.update_poll(&1, 23); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(30); + *indexer.poll_mut(&1).unwrap() = 23; + assert_eq!(*indexer.poll(&1).unwrap(), 23); - *time.borrow_mut() = 75; - indexer.update_poll(&0, 30); - assert!(indexer.poll_info(&0).is_none()); - assert_eq!(indexer.poll_info(&1).unwrap().filter, true); - assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23); + time.set(75); + assert!(indexer.poll(&0).is_none()); + assert_eq!(*indexer.poll(&1).unwrap(), 23); indexer.remove_poll(&1); - assert!(indexer.poll_info(&1).is_none()); + assert!(indexer.poll(&1).is_none()); } + } diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7ebf4b4d1..5e8db172a 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . //! Eth rpc implementation. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Weak, Mutex, RwLock}; use std::ops::Deref; use ethsync::{SyncProvider, SyncState}; @@ -316,27 +316,39 @@ impl Eth for EthClient } /// Eth filter rpc implementation. -pub struct EthFilterClient where C: BlockChainClient { +pub struct EthFilterClient + where C: BlockChainClient, + M: MinerService { + client: Weak, + miner: Weak, polls: Mutex>, } -impl EthFilterClient where C: BlockChainClient { +impl EthFilterClient + where C: BlockChainClient, + M: MinerService { + /// Creates new Eth filter client. - pub fn new(client: &Arc) -> Self { + pub fn new(client: &Arc, miner: &Arc) -> Self { EthFilterClient { client: Arc::downgrade(client), - polls: Mutex::new(PollManager::new()) + miner: Arc::downgrade(miner), + polls: Mutex::new(PollManager::new()), } } } -impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { +impl EthFilter for EthFilterClient + where C: BlockChainClient + 'static, + M: MinerService + 'static { + fn new_filter(&self, params: Params) -> Result { from_params::<(Filter,)>(params) .and_then(|(filter,)| { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + let block_number = take_weak!(self.client).chain_info().best_block_number; + let id = polls.create_poll(PollFilter::Logs(block_number, filter.into())); to_value(&U256::from(id)) }) } @@ -345,7 +357,7 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number)); to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -356,7 +368,9 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { match params { Params::None => { let mut polls = self.polls.lock().unwrap(); - let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + let pending_transactions = take_weak!(self.miner).pending_transactions_hashes(); + let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions)); + to_value(&U256::from(id)) }, _ => Err(Error::invalid_params()) @@ -367,37 +381,47 @@ impl EthFilter for EthFilterClient where C: BlockChainClient + 'static { let client = take_weak!(self.client); from_params::<(Index,)>(params) .and_then(|(index,)| { - let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned(); - match info { + let mut polls = self.polls.lock().unwrap(); + match polls.poll_mut(&index.value()) { None => Ok(Value::Array(vec![] as Vec)), - Some(info) => match info.filter { - PollFilter::Block => { + Some(filter) => match *filter { + PollFilter::Block(ref mut block_number) => { // + 1, cause we want to return hashes including current block hash. let current_number = client.chain_info().best_block_number + 1; - let hashes = (info.block_number..current_number).into_iter() + let hashes = (*block_number..current_number).into_iter() .map(BlockId::Number) .filter_map(|id| client.block_hash(id)) .collect::>(); - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&hashes) }, - PollFilter::PendingTransaction => { - // TODO: fix implementation once TransactionQueue is merged - to_value(&vec![] as &Vec) + PollFilter::PendingTransaction(ref mut previous_hashes) => { + let current_hashes = take_weak!(self.miner).pending_transactions_hashes(); + // calculate diff + let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::>(); + let diff = current_hashes + .iter() + .filter(|hash| previous_hashes_set.contains(&hash)) + .cloned() + .collect::>(); + + *previous_hashes = current_hashes; + + to_value(&diff) }, - PollFilter::Logs(mut filter) => { - filter.from_block = BlockId::Number(info.block_number); + PollFilter::Logs(ref mut block_number, ref mut filter) => { + filter.from_block = BlockId::Number(*block_number); filter.to_block = BlockId::Latest; - let logs = client.logs(filter) + let logs = client.logs(filter.clone()) .into_iter() .map(From::from) .collect::>(); let current_number = client.chain_info().best_block_number; - self.polls.lock().unwrap().update_poll(&index.value(), current_number); + *block_number = current_number; to_value(&logs) } }