Merge pull request #661 from ethcore/rpc_pending_filter
RPC Pending Transactions Filter
This commit is contained in:
commit
cd835e88fc
@ -82,6 +82,9 @@ pub trait MinerService : Send + Sync {
|
|||||||
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_nonce: T) -> Result<(), Error>
|
fn import_transactions<T>(&self, transactions: Vec<SignedTransaction>, fetch_nonce: T) -> Result<(), Error>
|
||||||
where T: Fn(&Address) -> U256;
|
where T: Fn(&Address) -> U256;
|
||||||
|
|
||||||
|
/// Returns hashes of transactions currently in pending
|
||||||
|
fn pending_transactions_hashes(&self) -> Vec<H256>;
|
||||||
|
|
||||||
/// Removes all transactions from the queue and restart mining operation.
|
/// Removes all transactions from the queue and restart mining operation.
|
||||||
fn clear_and_reset(&self, chain: &BlockChainClient);
|
fn clear_and_reset(&self, chain: &BlockChainClient);
|
||||||
|
|
||||||
|
@ -25,7 +25,6 @@ use ethcore::client::{BlockChainClient, BlockId};
|
|||||||
use ethcore::block::{ClosedBlock};
|
use ethcore::block::{ClosedBlock};
|
||||||
use ethcore::error::{Error};
|
use ethcore::error::{Error};
|
||||||
use ethcore::transaction::SignedTransaction;
|
use ethcore::transaction::SignedTransaction;
|
||||||
|
|
||||||
use super::{MinerService, MinerStatus, TransactionQueue};
|
use super::{MinerService, MinerStatus, TransactionQueue};
|
||||||
|
|
||||||
/// Keeps track of transactions using priority queue and holds currently mined block.
|
/// Keeps track of transactions using priority queue and holds currently mined block.
|
||||||
@ -104,6 +103,11 @@ impl MinerService for Miner {
|
|||||||
transaction_queue.add_all(transactions, fetch_nonce)
|
transaction_queue.add_all(transactions, fetch_nonce)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pending_transactions_hashes(&self) -> Vec<H256> {
|
||||||
|
let transaction_queue = self.transaction_queue.lock().unwrap();
|
||||||
|
transaction_queue.pending_hashes()
|
||||||
|
}
|
||||||
|
|
||||||
fn prepare_sealing(&self, chain: &BlockChainClient) {
|
fn prepare_sealing(&self, chain: &BlockChainClient) {
|
||||||
let no_of_transactions = 128;
|
let no_of_transactions = 128;
|
||||||
let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions);
|
let transactions = self.transaction_queue.lock().unwrap().top_transactions(no_of_transactions);
|
||||||
|
@ -431,6 +431,14 @@ impl TransactionQueue {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns hashes of all transactions from current, ordered by priority.
|
||||||
|
pub fn pending_hashes(&self) -> Vec<H256> {
|
||||||
|
self.current.by_priority
|
||||||
|
.iter()
|
||||||
|
.map(|t| t.hash)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes all elements (in any state) from the queue
|
/// Removes all elements (in any state) from the queue
|
||||||
pub fn clear(&mut self) {
|
pub fn clear(&mut self) {
|
||||||
self.current.clear();
|
self.current.clear();
|
||||||
@ -693,6 +701,24 @@ mod test {
|
|||||||
assert_eq!(top.len(), 2);
|
assert_eq!(top.len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_return_pending_hashes() {
|
||||||
|
// given
|
||||||
|
let mut txq = TransactionQueue::new();
|
||||||
|
|
||||||
|
let (tx, tx2) = new_txs(U256::from(1));
|
||||||
|
|
||||||
|
// when
|
||||||
|
txq.add(tx.clone(), &default_nonce).unwrap();
|
||||||
|
txq.add(tx2.clone(), &default_nonce).unwrap();
|
||||||
|
|
||||||
|
// then
|
||||||
|
let top = txq.pending_hashes();
|
||||||
|
assert_eq!(top[0], tx.hash());
|
||||||
|
assert_eq!(top[1], tx2.hash());
|
||||||
|
assert_eq!(top.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_put_transaction_to_futures_if_gap_detected() {
|
fn should_put_transaction_to_futures_if_gap_detected() {
|
||||||
// given
|
// given
|
||||||
|
@ -235,7 +235,7 @@ fn setup_rpc_server(
|
|||||||
"net" => server.add_delegate(NetClient::new(&sync).to_delegate()),
|
"net" => server.add_delegate(NetClient::new(&sync).to_delegate()),
|
||||||
"eth" => {
|
"eth" => {
|
||||||
server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate());
|
server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate());
|
||||||
server.add_delegate(EthFilterClient::new(&client).to_delegate());
|
server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate());
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
die!("{}: Invalid API name to be enabled.", api);
|
die!("{}: Invalid API name to be enabled.", api);
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
//! Helper type with all filter possibilities.
|
//! Helper type with all filter possibilities.
|
||||||
|
|
||||||
|
use util::hash::H256;
|
||||||
use ethcore::filter::Filter;
|
use ethcore::filter::Filter;
|
||||||
|
|
||||||
|
pub type BlockNumber = u64;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum PollFilter {
|
pub enum PollFilter {
|
||||||
Block,
|
Block(BlockNumber),
|
||||||
PendingTransaction,
|
PendingTransaction(Vec<H256>),
|
||||||
Logs(Filter)
|
Logs(BlockNumber, Filter)
|
||||||
}
|
}
|
||||||
|
@ -22,28 +22,13 @@ use transient_hashmap::{TransientHashMap, Timer, StandardTimer};
|
|||||||
const POLL_LIFETIME: u64 = 60;
|
const POLL_LIFETIME: u64 = 60;
|
||||||
|
|
||||||
pub type PollId = usize;
|
pub type PollId = usize;
|
||||||
pub type BlockNumber = u64;
|
|
||||||
|
|
||||||
pub struct PollInfo<F> {
|
|
||||||
pub filter: F,
|
|
||||||
pub block_number: BlockNumber
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F> Clone for PollInfo<F> where F: Clone {
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
PollInfo {
|
|
||||||
filter: self.filter.clone(),
|
|
||||||
block_number: self.block_number.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Indexes all poll requests.
|
/// Indexes all poll requests.
|
||||||
///
|
///
|
||||||
/// Lazily garbage collects unused polls info.
|
/// Lazily garbage collects unused polls info.
|
||||||
pub struct PollManager<F, T = StandardTimer> where T: Timer {
|
pub struct PollManager<F, T = StandardTimer> where T: Timer {
|
||||||
polls: TransientHashMap<PollId, PollInfo<F>, T>,
|
polls: TransientHashMap<PollId, F, T>,
|
||||||
next_available_id: PollId
|
next_available_id: PollId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F> PollManager<F, StandardTimer> {
|
impl<F> PollManager<F, StandardTimer> {
|
||||||
@ -54,6 +39,7 @@ impl<F> PollManager<F, StandardTimer> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<F, T> PollManager<F, T> where T: Timer {
|
impl<F, T> PollManager<F, T> where T: Timer {
|
||||||
|
|
||||||
pub fn new_with_timer(timer: T) -> Self {
|
pub fn new_with_timer(timer: T) -> Self {
|
||||||
PollManager {
|
PollManager {
|
||||||
polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer),
|
polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer),
|
||||||
@ -64,31 +50,30 @@ impl<F, T> PollManager<F, T> where T: Timer {
|
|||||||
/// Returns id which can be used for new poll.
|
/// Returns id which can be used for new poll.
|
||||||
///
|
///
|
||||||
/// Stores information when last poll happend.
|
/// Stores information when last poll happend.
|
||||||
pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId {
|
pub fn create_poll(&mut self, filter: F) -> PollId {
|
||||||
self.polls.prune();
|
self.polls.prune();
|
||||||
|
|
||||||
let id = self.next_available_id;
|
let id = self.next_available_id;
|
||||||
|
self.polls.insert(id, filter);
|
||||||
|
|
||||||
self.next_available_id += 1;
|
self.next_available_id += 1;
|
||||||
self.polls.insert(id, PollInfo {
|
|
||||||
filter: filter,
|
|
||||||
block_number: block,
|
|
||||||
});
|
|
||||||
id
|
id
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Updates information when last poll happend.
|
// Implementation is always using `poll_mut`
|
||||||
pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) {
|
#[cfg(test)]
|
||||||
self.polls.prune();
|
/// Get a reference to stored poll filter
|
||||||
if let Some(info) = self.polls.get_mut(id) {
|
pub fn poll(&mut self, id: &PollId) -> Option<&F> {
|
||||||
info.block_number = block;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns number of block when last poll happend.
|
|
||||||
pub fn poll_info(&mut self, id: &PollId) -> Option<&PollInfo<F>> {
|
|
||||||
self.polls.prune();
|
self.polls.prune();
|
||||||
self.polls.get(id)
|
self.polls.get(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a mutable reference to stored poll filter
|
||||||
|
pub fn poll_mut(&mut self, id: &PollId) -> Option<&mut F> {
|
||||||
|
self.polls.prune();
|
||||||
|
self.polls.get_mut(id)
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes poll info.
|
/// Removes poll info.
|
||||||
pub fn remove_poll(&mut self, id: &PollId) {
|
pub fn remove_poll(&mut self, id: &PollId) {
|
||||||
self.polls.remove(id);
|
self.polls.remove(id);
|
||||||
@ -97,48 +82,46 @@ impl<F, T> PollManager<F, T> where T: Timer {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::cell::RefCell;
|
use std::cell::Cell;
|
||||||
use transient_hashmap::Timer;
|
use transient_hashmap::Timer;
|
||||||
use v1::helpers::PollManager;
|
use v1::helpers::PollManager;
|
||||||
|
|
||||||
struct TestTimer<'a> {
|
struct TestTimer<'a> {
|
||||||
time: &'a RefCell<i64>,
|
time: &'a Cell<i64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Timer for TestTimer<'a> {
|
impl<'a> Timer for TestTimer<'a> {
|
||||||
fn get_time(&self) -> i64 {
|
fn get_time(&self) -> i64 {
|
||||||
*self.time.borrow()
|
self.time.get()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_poll_indexer() {
|
fn test_poll_indexer() {
|
||||||
let time = RefCell::new(0);
|
let time = Cell::new(0);
|
||||||
let timer = TestTimer {
|
let timer = TestTimer {
|
||||||
time: &time,
|
time: &time,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut indexer = PollManager::new_with_timer(timer);
|
let mut indexer = PollManager::new_with_timer(timer);
|
||||||
assert_eq!(indexer.create_poll(false, 20), 0);
|
assert_eq!(indexer.create_poll(20), 0);
|
||||||
assert_eq!(indexer.create_poll(true, 20), 1);
|
assert_eq!(indexer.create_poll(20), 1);
|
||||||
|
|
||||||
*time.borrow_mut() = 10;
|
time.set(10);
|
||||||
indexer.update_poll(&0, 21);
|
*indexer.poll_mut(&0).unwrap() = 21;
|
||||||
assert_eq!(indexer.poll_info(&0).unwrap().filter, false);
|
assert_eq!(*indexer.poll(&0).unwrap(), 21);
|
||||||
assert_eq!(indexer.poll_info(&0).unwrap().block_number, 21);
|
assert_eq!(*indexer.poll(&1).unwrap(), 20);
|
||||||
|
|
||||||
*time.borrow_mut() = 30;
|
time.set(30);
|
||||||
indexer.update_poll(&1, 23);
|
*indexer.poll_mut(&1).unwrap() = 23;
|
||||||
assert_eq!(indexer.poll_info(&1).unwrap().filter, true);
|
assert_eq!(*indexer.poll(&1).unwrap(), 23);
|
||||||
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
|
|
||||||
|
|
||||||
*time.borrow_mut() = 75;
|
time.set(75);
|
||||||
indexer.update_poll(&0, 30);
|
assert!(indexer.poll(&0).is_none());
|
||||||
assert!(indexer.poll_info(&0).is_none());
|
assert_eq!(*indexer.poll(&1).unwrap(), 23);
|
||||||
assert_eq!(indexer.poll_info(&1).unwrap().filter, true);
|
|
||||||
assert_eq!(indexer.poll_info(&1).unwrap().block_number, 23);
|
|
||||||
|
|
||||||
indexer.remove_poll(&1);
|
indexer.remove_poll(&1);
|
||||||
assert!(indexer.poll_info(&1).is_none());
|
assert!(indexer.poll(&1).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
//! Eth rpc implementation.
|
//! Eth rpc implementation.
|
||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::sync::{Arc, Weak, Mutex, RwLock};
|
use std::sync::{Arc, Weak, Mutex, RwLock};
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use ethsync::{SyncProvider, SyncState};
|
use ethsync::{SyncProvider, SyncState};
|
||||||
@ -316,27 +316,39 @@ impl<C, S, A, M> Eth for EthClient<C, S, A, M>
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Eth filter rpc implementation.
|
/// Eth filter rpc implementation.
|
||||||
pub struct EthFilterClient<C> where C: BlockChainClient {
|
pub struct EthFilterClient<C, M>
|
||||||
|
where C: BlockChainClient,
|
||||||
|
M: MinerService {
|
||||||
|
|
||||||
client: Weak<C>,
|
client: Weak<C>,
|
||||||
|
miner: Weak<M>,
|
||||||
polls: Mutex<PollManager<PollFilter>>,
|
polls: Mutex<PollManager<PollFilter>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> EthFilterClient<C> where C: BlockChainClient {
|
impl<C, M> EthFilterClient<C, M>
|
||||||
|
where C: BlockChainClient,
|
||||||
|
M: MinerService {
|
||||||
|
|
||||||
/// Creates new Eth filter client.
|
/// Creates new Eth filter client.
|
||||||
pub fn new(client: &Arc<C>) -> Self {
|
pub fn new(client: &Arc<C>, miner: &Arc<M>) -> Self {
|
||||||
EthFilterClient {
|
EthFilterClient {
|
||||||
client: Arc::downgrade(client),
|
client: Arc::downgrade(client),
|
||||||
polls: Mutex::new(PollManager::new())
|
miner: Arc::downgrade(miner),
|
||||||
|
polls: Mutex::new(PollManager::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
|
impl<C, M> EthFilter for EthFilterClient<C, M>
|
||||||
|
where C: BlockChainClient + 'static,
|
||||||
|
M: MinerService + 'static {
|
||||||
|
|
||||||
fn new_filter(&self, params: Params) -> Result<Value, Error> {
|
fn new_filter(&self, params: Params) -> Result<Value, Error> {
|
||||||
from_params::<(Filter,)>(params)
|
from_params::<(Filter,)>(params)
|
||||||
.and_then(|(filter,)| {
|
.and_then(|(filter,)| {
|
||||||
let mut polls = self.polls.lock().unwrap();
|
let mut polls = self.polls.lock().unwrap();
|
||||||
let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number);
|
let block_number = take_weak!(self.client).chain_info().best_block_number;
|
||||||
|
let id = polls.create_poll(PollFilter::Logs(block_number, filter.into()));
|
||||||
to_value(&U256::from(id))
|
to_value(&U256::from(id))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -345,7 +357,7 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
|
|||||||
match params {
|
match params {
|
||||||
Params::None => {
|
Params::None => {
|
||||||
let mut polls = self.polls.lock().unwrap();
|
let mut polls = self.polls.lock().unwrap();
|
||||||
let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number);
|
let id = polls.create_poll(PollFilter::Block(take_weak!(self.client).chain_info().best_block_number));
|
||||||
to_value(&U256::from(id))
|
to_value(&U256::from(id))
|
||||||
},
|
},
|
||||||
_ => Err(Error::invalid_params())
|
_ => Err(Error::invalid_params())
|
||||||
@ -356,7 +368,9 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
|
|||||||
match params {
|
match params {
|
||||||
Params::None => {
|
Params::None => {
|
||||||
let mut polls = self.polls.lock().unwrap();
|
let mut polls = self.polls.lock().unwrap();
|
||||||
let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number);
|
let pending_transactions = take_weak!(self.miner).pending_transactions_hashes();
|
||||||
|
let id = polls.create_poll(PollFilter::PendingTransaction(pending_transactions));
|
||||||
|
|
||||||
to_value(&U256::from(id))
|
to_value(&U256::from(id))
|
||||||
},
|
},
|
||||||
_ => Err(Error::invalid_params())
|
_ => Err(Error::invalid_params())
|
||||||
@ -367,37 +381,47 @@ impl<C> EthFilter for EthFilterClient<C> where C: BlockChainClient + 'static {
|
|||||||
let client = take_weak!(self.client);
|
let client = take_weak!(self.client);
|
||||||
from_params::<(Index,)>(params)
|
from_params::<(Index,)>(params)
|
||||||
.and_then(|(index,)| {
|
.and_then(|(index,)| {
|
||||||
let info = self.polls.lock().unwrap().poll_info(&index.value()).cloned();
|
let mut polls = self.polls.lock().unwrap();
|
||||||
match info {
|
match polls.poll_mut(&index.value()) {
|
||||||
None => Ok(Value::Array(vec![] as Vec<Value>)),
|
None => Ok(Value::Array(vec![] as Vec<Value>)),
|
||||||
Some(info) => match info.filter {
|
Some(filter) => match *filter {
|
||||||
PollFilter::Block => {
|
PollFilter::Block(ref mut block_number) => {
|
||||||
// + 1, cause we want to return hashes including current block hash.
|
// + 1, cause we want to return hashes including current block hash.
|
||||||
let current_number = client.chain_info().best_block_number + 1;
|
let current_number = client.chain_info().best_block_number + 1;
|
||||||
let hashes = (info.block_number..current_number).into_iter()
|
let hashes = (*block_number..current_number).into_iter()
|
||||||
.map(BlockId::Number)
|
.map(BlockId::Number)
|
||||||
.filter_map(|id| client.block_hash(id))
|
.filter_map(|id| client.block_hash(id))
|
||||||
.collect::<Vec<H256>>();
|
.collect::<Vec<H256>>();
|
||||||
|
|
||||||
self.polls.lock().unwrap().update_poll(&index.value(), current_number);
|
*block_number = current_number;
|
||||||
|
|
||||||
to_value(&hashes)
|
to_value(&hashes)
|
||||||
},
|
},
|
||||||
PollFilter::PendingTransaction => {
|
PollFilter::PendingTransaction(ref mut previous_hashes) => {
|
||||||
// TODO: fix implementation once TransactionQueue is merged
|
let current_hashes = take_weak!(self.miner).pending_transactions_hashes();
|
||||||
to_value(&vec![] as &Vec<H256>)
|
// calculate diff
|
||||||
|
let previous_hashes_set = previous_hashes.into_iter().map(|h| h.clone()).collect::<HashSet<H256>>();
|
||||||
|
let diff = current_hashes
|
||||||
|
.iter()
|
||||||
|
.filter(|hash| previous_hashes_set.contains(&hash))
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<H256>>();
|
||||||
|
|
||||||
|
*previous_hashes = current_hashes;
|
||||||
|
|
||||||
|
to_value(&diff)
|
||||||
},
|
},
|
||||||
PollFilter::Logs(mut filter) => {
|
PollFilter::Logs(ref mut block_number, ref mut filter) => {
|
||||||
filter.from_block = BlockId::Number(info.block_number);
|
filter.from_block = BlockId::Number(*block_number);
|
||||||
filter.to_block = BlockId::Latest;
|
filter.to_block = BlockId::Latest;
|
||||||
let logs = client.logs(filter)
|
let logs = client.logs(filter.clone())
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(From::from)
|
.map(From::from)
|
||||||
.collect::<Vec<Log>>();
|
.collect::<Vec<Log>>();
|
||||||
|
|
||||||
let current_number = client.chain_info().best_block_number;
|
let current_number = client.chain_info().best_block_number;
|
||||||
self.polls.lock().unwrap().update_poll(&index.value(), current_number);
|
|
||||||
|
|
||||||
|
*block_number = current_number;
|
||||||
to_value(&logs)
|
to_value(&logs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user