From 944bf6a59e769d6daea7a115a38a022bd4eaf661 Mon Sep 17 00:00:00 2001 From: IntegralTeam <35330335+IntegralTeam@users.noreply.github.com> Date: Fri, 12 Apr 2019 18:36:49 +0700 Subject: [PATCH] Watch transactions pool (#10558) adds `parity_watchTransactionsPool` --- Cargo.lock | 5 ++ ethcore/Cargo.toml | 1 + ethcore/light/Cargo.toml | 1 + ethcore/light/src/lib.rs | 1 + ethcore/light/src/transaction_queue.rs | 25 +++++- ethcore/src/lib.rs | 1 + ethcore/src/miner/miner.rs | 10 ++- miner/Cargo.toml | 3 + miner/src/lib.rs | 3 + miner/src/pool/listener.rs | 61 +++++++++++++++ miner/src/pool/mod.rs | 18 +++++ miner/src/pool/queue.rs | 13 +++- parity/cli/mod.rs | 6 +- parity/rpc_apis.rs | 29 +++++++ rpc/src/v1/impls/mod.rs | 2 + rpc/src/v1/impls/transactions_pool.rs | 101 +++++++++++++++++++++++++ rpc/src/v1/traits/mod.rs | 2 + rpc/src/v1/traits/transactions_pool.rs | 23 ++++++ 18 files changed, 297 insertions(+), 8 deletions(-) create mode 100644 rpc/src/v1/impls/transactions_pool.rs create mode 100644 rpc/src/v1/traits/transactions_pool.rs diff --git a/Cargo.lock b/Cargo.lock index a56d1b663..9c7c31f56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -729,6 +729,7 @@ dependencies = [ "ethkey 0.3.0", "evm 0.1.0", "fetch 0.1.0", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", "itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -876,6 +877,7 @@ dependencies = [ "ethcore-blockchain 0.1.0", "ethcore-db 0.1.0", "ethcore-io 1.12.0", + "ethcore-miner 1.12.0", "ethcore-network 1.12.0", "ethereum-types 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "failsafe 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -950,6 +952,9 @@ dependencies = [ "price-info 1.12.0", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "transaction-pool 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 336d6a0fc..1e64bbe1e 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -30,6 +30,7 @@ ethereum-types = "0.4" ethjson = { path = "../json" } ethkey = { path = "../accounts/ethkey" } evm = { path = "evm" } +futures = "0.1" hash-db = "0.11.0" heapsize = "0.4" itertools = "0.5" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 756b76f1f..e53d29a46 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -18,6 +18,7 @@ memory-db = "0.11.0" trie-db = "0.11.0" patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } ethcore-network = { path = "../../util/network" } +ethcore-miner = { path = "../../miner" } ethcore-io = { path = "../../util/io" } hash-db = "0.11.0" heapsize = "0.4" diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index 93e912e1d..31deecf31 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -61,6 +61,7 @@ extern crate ethcore_io as io; extern crate ethcore_network as network; extern crate parity_bytes as bytes; extern crate ethereum_types; +extern crate ethcore_miner as miner; extern crate ethcore; extern crate hash_db; extern crate heapsize; diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index 65e646d84..f996a4847 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -24,12 +24,15 @@ //! address-wise manner. use std::fmt; +use std::sync::Arc; use std::collections::{BTreeMap, HashMap}; use std::collections::hash_map::Entry; use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction}; use ethereum_types::{H256, U256, Address}; use fastmap::H256FastMap; +use futures::sync::mpsc; +use miner::pool::TxStatus; // Knowledge of an account's current nonce. #[derive(Debug, Clone, PartialEq, Eq)] @@ -134,6 +137,7 @@ pub struct TransactionQueue { by_account: HashMap, by_hash: H256FastMap, listeners: Vec, + tx_statuses_listeners: Vec>>>, } impl fmt::Debug for TransactionQueue { @@ -231,7 +235,7 @@ impl TransactionQueue { }; self.by_hash.insert(hash, tx); - self.notify(&promoted); + self.notify(&promoted, TxStatus::Added); Ok(res) } @@ -343,6 +347,8 @@ impl TransactionQueue { trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})", removed_hashes.len(), address, cur_nonce); + self.notify(&removed_hashes, TxStatus::Culled); + for hash in removed_hashes { self.by_hash.remove(&hash); } @@ -358,11 +364,26 @@ impl TransactionQueue { self.listeners.push(f); } + /// Add a transaction queue listener. + pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver>> { + let (sender, receiver) = mpsc::unbounded(); + self.tx_statuses_listeners.push(sender); + receiver + } + /// Notifies all listeners about new pending transaction. - fn notify(&self, hashes: &[H256]) { + fn notify(&mut self, hashes: &[H256], status: TxStatus) { for listener in &self.listeners { listener(hashes) } + + let to_send: Arc> = Arc::new( + hashes + .into_iter() + .map(|hash| (hash.clone(), status)).collect() + ); + + self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok()); } } diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index eee076b32..ef669d6d1 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -74,6 +74,7 @@ extern crate ethcore_miner; extern crate ethereum_types; extern crate ethjson; extern crate ethkey; +extern crate futures; extern crate hash_db; extern crate heapsize; extern crate itertools; diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index d9d4570a7..0c55af942 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -24,11 +24,12 @@ use bytes::Bytes; use call_contract::CallContract; use ethcore_miner::gas_pricer::GasPricer; use ethcore_miner::local_accounts::LocalAccounts; -use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy}; +use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy, TxStatus}; use ethcore_miner::service_transaction_checker::ServiceTransactionChecker; #[cfg(feature = "work-notify")] use ethcore_miner::work_notify::NotifyWork; use ethereum_types::{H256, U256, Address}; +use futures::sync::mpsc; use io::IoChannel; use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache}; use miner; @@ -263,6 +264,13 @@ impl Miner { self.transaction_queue.add_listener(f); } + /// Set a callback to be notified + pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver>> { + let (sender, receiver) = mpsc::unbounded(); + self.transaction_queue.add_tx_pool_listener(sender); + receiver + } + /// Creates new instance of miner Arc. pub fn new( options: MinerOptions, diff --git a/miner/Cargo.toml b/miner/Cargo.toml index f7dfd8f08..4a017653b 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -31,6 +31,9 @@ parity-runtime = { path = "../util/runtime" } parking_lot = "0.7" price-info = { path = "./price-info", optional = true } rlp = { version = "0.3.0", features = ["ethereum"] } +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" trace-time = "0.1" transaction-pool = "2.0" diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 55091093a..5babec098 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -34,6 +34,7 @@ extern crate parking_lot; extern crate price_info; extern crate rlp; extern crate transaction_pool as txpool; +extern crate serde; #[macro_use] extern crate ethabi_contract; @@ -44,6 +45,8 @@ extern crate error_chain; #[macro_use] extern crate log; #[macro_use] +extern crate serde_derive; +#[macro_use] extern crate trace_time; #[cfg(test)] diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 67034aa52..0f53afb83 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -20,9 +20,11 @@ use std::fmt; use std::sync::Arc; use ethereum_types::H256; +use futures::sync::mpsc; use txpool::{self, VerifiedTransaction}; use pool::VerifiedTransaction as Transaction; +use pool::TxStatus; type Listener = Box; @@ -116,6 +118,65 @@ impl txpool::Listener for Logger { } } +/// Transactions pool notifier +#[derive(Default)] +pub struct TransactionsPoolNotifier { + listeners: Vec>>>, + tx_statuses: Vec<(H256, TxStatus)>, +} + +impl TransactionsPoolNotifier { + /// Add new listener to receive notifications. + pub fn add(&mut self, f: mpsc::UnboundedSender>>) { + self.listeners.push(f); + } + + /// Notify listeners about all currently transactions. + pub fn notify(&mut self) { + if self.tx_statuses.is_empty() { + return; + } + + let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new())); + self.listeners + .retain(|listener| listener.unbounded_send(to_send.clone()).is_ok()); + } +} + +impl fmt::Debug for TransactionsPoolNotifier { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("TransactionsPoolNotifier") + .field("listeners", &self.listeners.len()) + .finish() + } +} + +impl txpool::Listener for TransactionsPoolNotifier { + fn added(&mut self, tx: &Arc, _old: Option<&Arc>) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Added)); + } + + fn rejected(&mut self, tx: &Arc, _reason: &txpool::Error) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Rejected)); + } + + fn dropped(&mut self, tx: &Arc, _new: Option<&Transaction>) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Dropped)); + } + + fn invalid(&mut self, tx: &Arc) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Invalid)); + } + + fn canceled(&mut self, tx: &Arc) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Canceled)); + } + + fn culled(&mut self, tx: &Arc) { + self.tx_statuses.push((tx.hash.clone(), TxStatus::Culled)); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 40a226d9f..fcd3859e7 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -199,3 +199,21 @@ impl ScoredTransaction for VerifiedTransaction { self.transaction.nonce } } + +/// Pool transactions status +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TxStatus { + /// Added transaction + Added, + /// Rejected transaction + Rejected, + /// Dropped transaction + Dropped, + /// Invalid transaction + Invalid, + /// Canceled transaction + Canceled, + /// Culled transaction + Culled, +} diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index ad7c9e6f1..2d8046bd9 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -22,17 +22,18 @@ use std::sync::atomic::{self, AtomicUsize}; use std::collections::{BTreeMap, BTreeSet, HashMap}; use ethereum_types::{H256, U256, Address}; +use futures::sync::mpsc; use parking_lot::RwLock; use txpool::{self, Verifier}; use types::transaction; use pool::{ self, replace, scoring, verifier, client, ready, listener, - PrioritizationStrategy, PendingOrdering, PendingSettings, + PrioritizationStrategy, PendingOrdering, PendingSettings, TxStatus }; use pool::local_transactions::LocalTransactionsList; -type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); +type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier))); type Pool = txpool::Pool; /// Max cache time in milliseconds for pending transactions. @@ -304,6 +305,8 @@ impl TransactionQueue { // Notify about imported transactions. (self.pool.write().listener_mut().1).0.notify(); + ((self.pool.write().listener_mut().1).1).1.notify(); + if results.iter().any(|r| r.is_ok()) { self.cached_pending.write().clear(); } @@ -574,6 +577,12 @@ impl TransactionQueue { (pool.listener_mut().1).0.add(f); } + /// Add a listener to be notified about all transactions the pool + pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender>>) { + let mut pool = self.pool.write(); + ((pool.listener_mut().1).1).1.add(f); + } + /// Check if pending set is cached. #[cfg(test)] pub fn is_pending_cached(&self) -> bool { diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 6735f9685..a2918f85e 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -498,7 +498,7 @@ usage! { "--jsonrpc-interface=[IP]", "Specify the hostname portion of the HTTP JSON-RPC API server, IP should be an interface's IP address, or all (all interfaces) or local.", - ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), + ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), "--jsonrpc-apis=[APIS]", "Specify the APIs available through the HTTP JSON-RPC interface using a comma-delimited list of API names. Possible names are: all, safe, debug, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", @@ -539,7 +539,7 @@ usage! { "--ws-interface=[IP]", "Specify the hostname portion of the WebSockets JSON-RPC server, IP should be an interface's IP address, or all (all interfaces) or local.", - ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), + ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), "--ws-apis=[APIS]", "Specify the JSON-RPC APIs available through the WebSockets interface using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", @@ -564,7 +564,7 @@ usage! { "--ipc-path=[PATH]", "Specify custom path for JSON-RPC over IPC service.", - ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), + ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), "--ipc-apis=[APIS]", "Specify custom API set available via JSON-RPC over IPC using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index cc85af71f..40dab3f5a 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -38,6 +38,7 @@ use miner::external::ExternalMiner; use parity_rpc::dispatch::{FullDispatcher, LightDispatcher}; use parity_rpc::informant::{ActivityNotifier, ClientNotifier}; use parity_rpc::{Host, Metadata, NetworkSettings}; +use parity_rpc::v1::traits::TransactionsPool; use parity_runtime::Executor; use parking_lot::{Mutex, RwLock}; use sync::{LightSync, ManageNetwork, SyncProvider}; @@ -82,6 +83,8 @@ pub enum Api { /// Geth-compatible (best-effort) debug API (Potentially UNSAFE) /// NOTE We don't aim to support all methods, only the ones that are useful. Debug, + /// Parity Transactions pool PubSub + ParityTransactionsPool, } impl FromStr for Api { @@ -108,6 +111,7 @@ impl FromStr for Api { "signer" => Ok(Signer), "traces" => Ok(Traces), "web3" => Ok(Web3), + "parity_transactions_pool" => Ok(ParityTransactionsPool), api => Err(format!("Unknown api: {}", api)), } } @@ -191,6 +195,7 @@ fn to_modules(apis: &HashSet) -> BTreeMap { Api::Web3 => ("web3", "1.0"), Api::Whisper => ("shh", "1.0"), Api::WhisperPubSub => ("shh_pubsub", "1.0"), + Api::ParityTransactionsPool => ("parity_transactions_pool", "1.0"), }; modules.insert(name.into(), version.into()); } @@ -352,6 +357,13 @@ impl FullDependencies { handler.extend_with(client.to_delegate()); } } + Api::ParityTransactionsPool => { + if !for_generic_pubsub { + let receiver = self.miner.tx_pool_receiver(); + let client = TransactionsPoolClient::new(self.executor.clone(), receiver); + handler.extend_with(TransactionsPoolClient::to_delegate(client)); + } + } Api::Personal => { #[cfg(feature = "accounts")] handler.extend_with( @@ -603,6 +615,13 @@ impl LightDependencies { })); handler.extend_with(EthPubSub::to_delegate(client)); } + Api::ParityTransactionsPool => { + if !for_generic_pubsub { + let receiver = self.transaction_queue.write().tx_statuses_receiver(); + let client = TransactionsPoolClient::new(self.executor.clone(), receiver); + handler.extend_with(TransactionsPoolClient::to_delegate(client)); + } + } Api::Personal => { #[cfg(feature = "accounts")] handler.extend_with( @@ -744,12 +763,14 @@ impl ApiSet { ApiSet::UnsafeContext => { public_list.insert(Api::Traces); public_list.insert(Api::ParityPubSub); + public_list.insert(Api::ParityTransactionsPool); public_list } ApiSet::IpcContext => { public_list.insert(Api::Traces); public_list.insert(Api::ParityPubSub); public_list.insert(Api::ParityAccounts); + public_list.insert(Api::ParityTransactionsPool); public_list } ApiSet::All => { @@ -761,6 +782,7 @@ impl ApiSet { public_list.insert(Api::Signer); public_list.insert(Api::Personal); public_list.insert(Api::SecretStore); + public_list.insert(Api::ParityTransactionsPool); public_list } ApiSet::PubSub => [ @@ -769,6 +791,7 @@ impl ApiSet { Api::ParityAccounts, Api::ParitySet, Api::Traces, + Api::ParityTransactionsPool, ] .into_iter() .cloned() @@ -799,6 +822,7 @@ mod test { assert_eq!(Api::Private, "private".parse().unwrap()); assert_eq!(Api::Whisper, "shh".parse().unwrap()); assert_eq!(Api::WhisperPubSub, "shh_pubsub".parse().unwrap()); + assert_eq!(Api::ParityTransactionsPool, "parity_transactions_pool".parse().unwrap()); assert!("rp".parse::().is_err()); } @@ -830,6 +854,7 @@ mod test { Api::Whisper, Api::WhisperPubSub, Api::Private, + Api::ParityTransactionsPool, ].into_iter() .collect(); assert_eq!(ApiSet::UnsafeContext.list_apis(), expected); @@ -850,6 +875,7 @@ mod test { Api::Whisper, Api::WhisperPubSub, Api::Private, + Api::ParityTransactionsPool, // semi-safe Api::ParityAccounts, ].into_iter() @@ -880,6 +906,7 @@ mod test { Api::Personal, Api::Private, Api::Debug, + Api::ParityTransactionsPool, ].into_iter() .collect() ) @@ -908,6 +935,7 @@ mod test { Api::Signer, Api::Private, Api::Debug, + Api::ParityTransactionsPool, ].into_iter() .collect() ) @@ -931,6 +959,7 @@ mod test { Api::Whisper, Api::WhisperPubSub, Api::Private, + Api::ParityTransactionsPool, ].into_iter() .collect() ) diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index ba1cc100e..922f1e2ad 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -36,6 +36,7 @@ mod signer; mod signing; mod signing_unsafe; mod traces; +mod transactions_pool; mod web3; pub mod light; @@ -44,6 +45,7 @@ pub use self::debug::DebugClient; pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth_filter::EthFilterClient; pub use self::eth_pubsub::EthPubSubClient; +pub use self::transactions_pool::TransactionsPoolClient; pub use self::net::NetClient; pub use self::parity::ParityClient; #[cfg(any(test, feature = "accounts"))] diff --git a/rpc/src/v1/impls/transactions_pool.rs b/rpc/src/v1/impls/transactions_pool.rs new file mode 100644 index 000000000..789395d7e --- /dev/null +++ b/rpc/src/v1/impls/transactions_pool.rs @@ -0,0 +1,101 @@ +use std::sync::{Arc, Weak}; + +use jsonrpc_core::Result; +use jsonrpc_core::futures::Future; +use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}}; + +use v1::helpers::Subscribers; +use v1::metadata::Metadata; +use v1::traits::TransactionsPool; + +use miner::pool::TxStatus; +use parity_runtime::Executor; +use parking_lot::RwLock; +use ethereum_types::H256; +use futures::{Stream, sync::mpsc}; + +type Client = Sink<(H256, TxStatus)>; + +/// Transactions pool PubSub implementation. +pub struct TransactionsPoolClient { + handler: Arc, + transactions_pool_subscribers: Arc>>, +} + +impl TransactionsPoolClient { + /// Creates new `TransactionsPoolClient`. + pub fn new(executor: Executor, pool_receiver: mpsc::UnboundedReceiver>>) -> Self { + let transactions_pool_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let handler = Arc::new( + TransactionsNotificationHandler::new( + executor.clone(), + transactions_pool_subscribers.clone(), + ) + ); + let handler2 = Arc::downgrade(&handler); + + executor.spawn(pool_receiver + .for_each(move |tx_status| { + if let Some(handler2) = handler2.upgrade() { + handler2.notify_transaction(tx_status); + } + Ok(()) + }) + .map_err(|e| warn!("Key server listener error: {:?}", e)) + ); + + TransactionsPoolClient { + handler, + transactions_pool_subscribers, + } + } + + /// Returns a chain notification handler. + pub fn handler(&self) -> Weak { + Arc::downgrade(&self.handler) + } +} + +/// Transactions pool PubSub Notification handler. +pub struct TransactionsNotificationHandler { + executor: Executor, + transactions_pool_subscribers: Arc>>, +} + +impl TransactionsNotificationHandler { + fn new(executor: Executor, transactions_pool_subscribers: Arc>>) -> Self { + TransactionsNotificationHandler { + executor, + transactions_pool_subscribers, + } + } + + fn notify(executor: &Executor, subscriber: &Client, result: (H256, TxStatus)) { + executor.spawn(subscriber + .notify(Ok(result)) + .map(|_| ()) + .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) + ); + } + + pub fn notify_transaction(&self, tx_statuses: Arc>) { + for subscriber in self.transactions_pool_subscribers.read().values() { + for tx_status in tx_statuses.to_vec() { + Self::notify(&self.executor, subscriber, tx_status.clone()); + } + } + } +} + +impl TransactionsPool for TransactionsPoolClient { + type Metadata = Metadata; + + fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<(H256, TxStatus)>) { + self.transactions_pool_subscribers.write().push(subscriber); + } + + fn unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + let res = self.transactions_pool_subscribers.write().remove(&id).is_some(); + Ok(res) + } +} diff --git a/rpc/src/v1/traits/mod.rs b/rpc/src/v1/traits/mod.rs index e25ca76ac..c92e724c2 100644 --- a/rpc/src/v1/traits/mod.rs +++ b/rpc/src/v1/traits/mod.rs @@ -32,6 +32,7 @@ pub mod rpc; pub mod secretstore; pub mod signer; pub mod traces; +pub mod transactions_pool; pub mod web3; pub use self::debug::Debug; @@ -50,4 +51,5 @@ pub use self::rpc::Rpc; pub use self::secretstore::SecretStore; pub use self::signer::Signer; pub use self::traces::Traces; +pub use self::transactions_pool::TransactionsPool; pub use self::web3::Web3; diff --git a/rpc/src/v1/traits/transactions_pool.rs b/rpc/src/v1/traits/transactions_pool.rs new file mode 100644 index 000000000..168148d6e --- /dev/null +++ b/rpc/src/v1/traits/transactions_pool.rs @@ -0,0 +1,23 @@ +//! Transactions pool PUB-SUB rpc interface. + +use jsonrpc_core::Result; +use jsonrpc_pubsub::{typed, SubscriptionId}; +use jsonrpc_derive::rpc; +use miner::pool::TxStatus; + +use ethereum_types::H256; + +/// Transactions Pool PUB-SUB rpc interface. +#[rpc] +pub trait TransactionsPool { + /// Pub/Sub Metadata + type Metadata; + + /// Subscribe to Transactions Pool subscription. + #[pubsub(subscription = "parity_watchTransactionsPool", subscribe, name = "parity_watchTransactionsPool")] + fn subscribe(&self, Self::Metadata, typed::Subscriber<(H256, TxStatus)>); + + /// Unsubscribe from existing Transactions Pool subscription. + #[pubsub(subscription = "parity_watchTransactionsPool", unsubscribe, name = "parity_unwatchTransactionsPool")] + fn unsubscribe(&self, Option, SubscriptionId) -> Result; +}