From 236a4aac22890bf3823797e15fb16251ab30f6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 16 Feb 2018 16:51:34 +0100 Subject: [PATCH] Pending transactions subscription (#7906) * Pending transactions subscription. * Pending transactions Pub-Sub * Add light client support. * Remove redundant Ok --- ethcore/light/src/transaction_queue.rs | 54 +++++++++++++++++++++----- ethcore/src/miner/miner.rs | 30 +++++++++++--- parity/rpc_apis.rs | 19 +++++++-- rpc/src/v1/impls/eth_pubsub.rs | 38 ++++++++++++++---- rpc/src/v1/tests/mocked/eth_pubsub.rs | 54 ++++++++++++++++++++++++-- rpc/src/v1/types/pubsub.rs | 5 ++- 6 files changed, 168 insertions(+), 32 deletions(-) diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index ff4baf776..156152253 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -23,6 +23,7 @@ //! accounts for which they create transactions, this queue is structured in an //! address-wise manner. +use std::fmt; use std::collections::{BTreeMap, HashMap}; use std::collections::hash_map::Entry; @@ -99,25 +100,44 @@ impl AccountTransactions { } // attempt to move transactions from the future queue into the current queue. - fn adjust_future(&mut self) { + fn adjust_future(&mut self) -> Vec { + let mut promoted = Vec::new(); let mut next_nonce = self.next_nonce(); loop { match self.future.remove(&next_nonce) { - Some(tx) => self.current.push(tx), + Some(tx) => { + promoted.push(tx.hash); + self.current.push(tx) + }, None => break, } next_nonce = next_nonce + 1.into(); } + + promoted } } +type Listener = Box; + /// Light transaction queue. See module docs for more details. -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Default)] pub struct TransactionQueue { by_account: HashMap, by_hash: H256FastMap, + listeners: Vec, +} + +impl fmt::Debug for TransactionQueue { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("TransactionQueue") + .field("by_account", &self.by_account) + .field("by_hash", &self.by_hash) + .field("listeners", &self.listeners.len()) + .finish() + } } impl TransactionQueue { @@ -130,7 +150,7 @@ impl TransactionQueue { if self.by_hash.contains_key(&hash) { return Err(transaction::Error::AlreadyImported) } - let res = match self.by_account.entry(sender) { + let (res, promoted) = match self.by_account.entry(sender) { Entry::Vacant(entry) => { entry.insert(AccountTransactions { cur_nonce: CurrentNonce::Assumed(nonce), @@ -138,7 +158,7 @@ impl TransactionQueue { future: BTreeMap::new(), }); - transaction::ImportResult::Current + (transaction::ImportResult::Current, vec![hash]) } Entry::Occupied(mut entry) => { let acct_txs = entry.get_mut(); @@ -160,7 +180,7 @@ impl TransactionQueue { let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info); self.by_hash.remove(&old.hash); - transaction::ImportResult::Current + (transaction::ImportResult::Current, vec![hash]) } Err(idx) => { let cur_len = acct_txs.current.len(); @@ -182,21 +202,22 @@ impl TransactionQueue { acct_txs.future.insert(future_nonce, future); } - transaction::ImportResult::Current + (transaction::ImportResult::Current, vec![hash]) } else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) { trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce); let future_nonce = nonce; acct_txs.future.insert(future_nonce, tx_info); - transaction::ImportResult::Future + (transaction::ImportResult::Future, vec![]) } else { trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce); // insert, then check if we've filled any gaps. acct_txs.current.insert(idx, tx_info); - acct_txs.adjust_future(); + let mut promoted = acct_txs.adjust_future(); + promoted.insert(0, hash); - transaction::ImportResult::Current + (transaction::ImportResult::Current, promoted) } } } @@ -204,6 +225,7 @@ impl TransactionQueue { }; self.by_hash.insert(hash, tx); + self.notify(&promoted); Ok(res) } @@ -324,6 +346,18 @@ impl TransactionQueue { pub fn get(&self, hash: &H256) -> Option<&PendingTransaction> { self.by_hash.get(&hash) } + + /// Add a transaction queue listener. + pub fn add_listener(&mut self, f: Listener) { + self.listeners.push(f); + } + + /// Notifies all listeners about new pending transaction. + fn notify(&self, hashes: &[H256]) { + for listener in &self.listeners { + listener(hashes) + } + } } #[cfg(test)] diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index dc0673d07..21fbac24b 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -248,6 +248,7 @@ struct SealingWork { pub struct Miner { // NOTE [ToDr] When locking always lock in this order! transaction_queue: Arc>, + transaction_listener: RwLock>>, sealing_work: Mutex, next_allowed_reseal: Mutex, next_mandatory_reseal: RwLock, @@ -314,6 +315,7 @@ impl Miner { Miner { transaction_queue: Arc::new(RwLock::new(txq)), + transaction_listener: RwLock::new(vec![]), next_allowed_reseal: Mutex::new(Instant::now()), next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period), sealing_block_last_request: Mutex::new(0), @@ -369,6 +371,11 @@ impl Miner { self.map_pending_block(|b| b.header().clone(), latest_block_number) } + /// Set a callback to be notified about imported transactions' hashes. + pub fn add_transactions_listener(&self, f: Box) { + self.transaction_listener.write().push(f); + } + fn map_pending_block(&self, f: F, latest_block_number: BlockNumber) -> Option where F: FnOnce(&ClosedBlock) -> T, { @@ -694,8 +701,9 @@ impl Miner { ) -> Vec> { let best_block_header = client.best_block_header().decode(); let insertion_time = client.chain_info().best_block_number; + let mut inserted = Vec::with_capacity(transactions.len()); - transactions.into_iter() + let results = transactions.into_iter() .map(|tx| { let hash = tx.hash(); if client.transaction_block(TransactionId::Hash(hash)).is_some() { @@ -721,18 +729,28 @@ impl Miner { }).unwrap_or(default_origin); let details_provider = TransactionDetailsProvider::new(client, &self.service_transaction_action); - match origin { + let hash = transaction.hash(); + let result = match origin { TransactionOrigin::Local | TransactionOrigin::RetractedBlock => { - Ok(transaction_queue.add(transaction, origin, insertion_time, condition.clone(), &details_provider)?) + transaction_queue.add(transaction, origin, insertion_time, condition.clone(), &details_provider)? }, TransactionOrigin::External => { - Ok(transaction_queue.add_with_banlist(transaction, insertion_time, &details_provider)?) + transaction_queue.add_with_banlist(transaction, insertion_time, &details_provider)? }, - } + }; + + inserted.push(hash); + Ok(result) }, } }) - .collect() + .collect(); + + for listener in &*self.transaction_listener.read() { + listener(&inserted); + } + + results } /// Are we allowed to do a non-mandatory reseal? diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index e519daf77..f2ab07591 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -295,7 +295,14 @@ impl FullDependencies { Api::EthPubSub => { if !for_generic_pubsub { let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); - self.client.add_notify(client.handler()); + let h = client.handler(); + self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() { + h.new_transactions(hashes); + })); + + if let Some(h) = client.handler().upgrade() { + self.client.add_notify(h); + } handler.extend_with(client.to_delegate()); } }, @@ -501,9 +508,13 @@ impl LightDependencies { self.remote.clone(), self.gas_price_percentile, ); - self.client.add_listener( - Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify> - ); + self.client.add_listener(client.handler() as Weak<_>); + let h = client.handler(); + self.transaction_queue.write().add_listener(Box::new(move |transactions| { + if let Some(h) = h.upgrade() { + h.new_transactions(transactions); + } + })); handler.extend_with(EthPubSub::to_delegate(client)); }, Api::Personal => { diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 938562a12..315fc2f4f 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -16,7 +16,7 @@ //! Eth PUB-SUB rpc implementation. -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::collections::BTreeMap; use jsonrpc_core::{BoxFuture, Result, Error}; @@ -50,6 +50,7 @@ pub struct EthPubSubClient { handler: Arc>, heads_subscribers: Arc>>, logs_subscribers: Arc>>, + transactions_subscribers: Arc>>, } impl EthPubSubClient { @@ -57,15 +58,19 @@ impl EthPubSubClient { pub fn new(client: Arc, remote: Remote) -> Self { let heads_subscribers = Arc::new(RwLock::new(Subscribers::default())); let logs_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default())); + EthPubSubClient { handler: Arc::new(ChainNotificationHandler { client, remote, heads_subscribers: heads_subscribers.clone(), logs_subscribers: logs_subscribers.clone(), + transactions_subscribers: transactions_subscribers.clone(), }), heads_subscribers, logs_subscribers, + transactions_subscribers, } } @@ -75,12 +80,13 @@ impl EthPubSubClient { let client = Self::new(client, remote); *client.heads_subscribers.write() = Subscribers::new_test(); *client.logs_subscribers.write() = Subscribers::new_test(); + *client.transactions_subscribers.write() = Subscribers::new_test(); client } /// Returns a chain notification handler. - pub fn handler(&self) -> Arc> { - self.handler.clone() + pub fn handler(&self) -> Weak> { + Arc::downgrade(&self.handler) } } @@ -111,6 +117,7 @@ pub struct ChainNotificationHandler { remote: Remote, heads_subscribers: Arc>>, logs_subscribers: Arc>>, + transactions_subscribers: Arc>>, } impl ChainNotificationHandler { @@ -164,6 +171,15 @@ impl ChainNotificationHandler { ); } } + + /// Notify all subscribers about new transaction hashes. + pub fn new_transactions(&self, hashes: &[H256]) { + for subscriber in self.transactions_subscribers.read().values() { + for hash in hashes { + Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into())); + } + } + } } /// A light client wrapper struct. @@ -256,16 +272,23 @@ impl EthPubSub for EthPubSubClient { self.heads_subscribers.write().push(subscriber); return; }, + (pubsub::Kind::NewHeads, _) => { + errors::invalid_params("newHeads", "Expected no parameters.") + }, (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => { self.logs_subscribers.write().push(subscriber, filter.into()); return; }, - (pubsub::Kind::NewHeads, _) => { - errors::invalid_params("newHeads", "Expected no parameters.") - }, (pubsub::Kind::Logs, _) => { errors::invalid_params("logs", "Expected a filter object.") }, + (pubsub::Kind::NewPendingTransactions, None) => { + self.transactions_subscribers.write().push(subscriber); + return; + }, + (pubsub::Kind::NewPendingTransactions, _) => { + errors::invalid_params("newPendingTransactions", "Expected no parameters.") + }, _ => { errors::unimplemented(None) }, @@ -277,7 +300,8 @@ impl EthPubSub for EthPubSubClient { fn unsubscribe(&self, id: SubscriptionId) -> Result { let res = self.heads_subscribers.write().remove(&id).is_some(); let res2 = self.logs_subscribers.write().remove(&id).is_some(); + let res3 = self.transactions_subscribers.write().remove(&id).is_some(); - Ok(res || res2) + Ok(res || res2 || res3) } } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 56843ca62..0cf119432 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -37,7 +37,7 @@ fn should_subscribe_to_new_heads() { let h1 = client.block_hash_delta_minus(3); let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); - let handler = pubsub.handler(); + let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default(); @@ -109,7 +109,7 @@ fn should_subscribe_to_logs() { ]); let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); - let handler = pubsub.handler(); + let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default(); @@ -150,6 +150,54 @@ fn should_subscribe_to_logs() { assert_eq!(res, None); } + +#[test] +fn should_subscribe_to_pending_transactions() { + // given + let el = EventLoop::spawn(); + let client = TestBlockChainClient::new(); + + let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); + let handler = pubsub.handler().upgrade().unwrap(); + let pubsub = pubsub.to_delegate(); + + let mut io = MetaIoHandler::default(); + io.extend_with(pubsub); + + let mut metadata = Metadata::default(); + let (sender, receiver) = futures::sync::mpsc::channel(8); + metadata.session = Some(Arc::new(Session::new(sender))); + + // Fail if params are provided + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions", {}], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Couldn't parse parameters: newPendingTransactions","data":"\"Expected no parameters.\""},"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + + // Subscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + + // Send new transactions + handler.new_transactions(&[5.into(), 7.into()]); + + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#; + assert_eq!(res, Some(response.into())); + + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000007","subscription":"0x416d77337e24399d"}}"#; + assert_eq!(res, Some(response.into())); + + // And unsubscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned())); + + let (res, _receiver) = receiver.into_future().wait().unwrap(); + assert_eq!(res, None); +} + #[test] fn should_return_unimplemented() { // given @@ -167,8 +215,6 @@ fn should_return_unimplemented() { // Subscribe let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#; - let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#; - assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/pubsub.rs b/rpc/src/v1/types/pubsub.rs index 608a1cbcc..dfac5a0ab 100644 --- a/rpc/src/v1/types/pubsub.rs +++ b/rpc/src/v1/types/pubsub.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::de::Error; use serde_json::{Value, from_value}; -use v1::types::{RichHeader, Filter, Log}; +use v1::types::{RichHeader, Filter, Log, H256}; /// Subscription result. #[derive(Debug, Clone, PartialEq, Eq)] @@ -28,6 +28,8 @@ pub enum Result { Header(RichHeader), /// Log Log(Log), + /// Transaction hash + TransactionHash(H256), } impl Serialize for Result { @@ -37,6 +39,7 @@ impl Serialize for Result { match *self { Result::Header(ref header) => header.serialize(serializer), Result::Log(ref log) => log.serialize(serializer), + Result::TransactionHash(ref hash) => hash.serialize(serializer), } } }