Watch transactions pool (#10558)

adds `parity_watchTransactionsPool`
This commit is contained in:
IntegralTeam 2019-04-12 18:36:49 +07:00 committed by Seun LanLege
parent 90a7ca9d10
commit 944bf6a59e
18 changed files with 297 additions and 8 deletions

5
Cargo.lock generated
View File

@ -729,6 +729,7 @@ dependencies = [
"ethkey 0.3.0", "ethkey 0.3.0",
"evm 0.1.0", "evm 0.1.0",
"fetch 0.1.0", "fetch 0.1.0",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"hash-db 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)", "heapsize 0.4.2 (git+https://github.com/cheme/heapsize.git?branch=ec-macfix)",
"itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)", "itertools 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)",
@ -876,6 +877,7 @@ dependencies = [
"ethcore-blockchain 0.1.0", "ethcore-blockchain 0.1.0",
"ethcore-db 0.1.0", "ethcore-db 0.1.0",
"ethcore-io 1.12.0", "ethcore-io 1.12.0",
"ethcore-miner 1.12.0",
"ethcore-network 1.12.0", "ethcore-network 1.12.0",
"ethereum-types 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"failsafe 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "failsafe 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -950,6 +952,9 @@ dependencies = [
"price-info 1.12.0", "price-info 1.12.0",
"rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)",
"trace-time 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"transaction-pool 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "transaction-pool 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -30,6 +30,7 @@ ethereum-types = "0.4"
ethjson = { path = "../json" } ethjson = { path = "../json" }
ethkey = { path = "../accounts/ethkey" } ethkey = { path = "../accounts/ethkey" }
evm = { path = "evm" } evm = { path = "evm" }
futures = "0.1"
hash-db = "0.11.0" hash-db = "0.11.0"
heapsize = "0.4" heapsize = "0.4"
itertools = "0.5" itertools = "0.5"

View File

@ -18,6 +18,7 @@ memory-db = "0.11.0"
trie-db = "0.11.0" trie-db = "0.11.0"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" } patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
ethcore-network = { path = "../../util/network" } ethcore-network = { path = "../../util/network" }
ethcore-miner = { path = "../../miner" }
ethcore-io = { path = "../../util/io" } ethcore-io = { path = "../../util/io" }
hash-db = "0.11.0" hash-db = "0.11.0"
heapsize = "0.4" heapsize = "0.4"

View File

@ -61,6 +61,7 @@ extern crate ethcore_io as io;
extern crate ethcore_network as network; extern crate ethcore_network as network;
extern crate parity_bytes as bytes; extern crate parity_bytes as bytes;
extern crate ethereum_types; extern crate ethereum_types;
extern crate ethcore_miner as miner;
extern crate ethcore; extern crate ethcore;
extern crate hash_db; extern crate hash_db;
extern crate heapsize; extern crate heapsize;

View File

@ -24,12 +24,15 @@
//! address-wise manner. //! address-wise manner.
use std::fmt; use std::fmt;
use std::sync::Arc;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction}; use common_types::transaction::{self, Condition, PendingTransaction, SignedTransaction};
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use fastmap::H256FastMap; use fastmap::H256FastMap;
use futures::sync::mpsc;
use miner::pool::TxStatus;
// Knowledge of an account's current nonce. // Knowledge of an account's current nonce.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
@ -134,6 +137,7 @@ pub struct TransactionQueue {
by_account: HashMap<Address, AccountTransactions>, by_account: HashMap<Address, AccountTransactions>,
by_hash: H256FastMap<PendingTransaction>, by_hash: H256FastMap<PendingTransaction>,
listeners: Vec<Listener>, listeners: Vec<Listener>,
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
} }
impl fmt::Debug for TransactionQueue { impl fmt::Debug for TransactionQueue {
@ -231,7 +235,7 @@ impl TransactionQueue {
}; };
self.by_hash.insert(hash, tx); self.by_hash.insert(hash, tx);
self.notify(&promoted); self.notify(&promoted, TxStatus::Added);
Ok(res) Ok(res)
} }
@ -343,6 +347,8 @@ impl TransactionQueue {
trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})", trace!(target: "txqueue", "Culled {} old transactions from sender {} (nonce={})",
removed_hashes.len(), address, cur_nonce); removed_hashes.len(), address, cur_nonce);
self.notify(&removed_hashes, TxStatus::Culled);
for hash in removed_hashes { for hash in removed_hashes {
self.by_hash.remove(&hash); self.by_hash.remove(&hash);
} }
@ -358,11 +364,26 @@ impl TransactionQueue {
self.listeners.push(f); self.listeners.push(f);
} }
/// Add a transaction queue listener.
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.tx_statuses_listeners.push(sender);
receiver
}
/// Notifies all listeners about new pending transaction. /// Notifies all listeners about new pending transaction.
fn notify(&self, hashes: &[H256]) { fn notify(&mut self, hashes: &[H256], status: TxStatus) {
for listener in &self.listeners { for listener in &self.listeners {
listener(hashes) listener(hashes)
} }
let to_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
hashes
.into_iter()
.map(|hash| (hash.clone(), status)).collect()
);
self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok());
} }
} }

View File

@ -74,6 +74,7 @@ extern crate ethcore_miner;
extern crate ethereum_types; extern crate ethereum_types;
extern crate ethjson; extern crate ethjson;
extern crate ethkey; extern crate ethkey;
extern crate futures;
extern crate hash_db; extern crate hash_db;
extern crate heapsize; extern crate heapsize;
extern crate itertools; extern crate itertools;

View File

@ -24,11 +24,12 @@ use bytes::Bytes;
use call_contract::CallContract; use call_contract::CallContract;
use ethcore_miner::gas_pricer::GasPricer; use ethcore_miner::gas_pricer::GasPricer;
use ethcore_miner::local_accounts::LocalAccounts; use ethcore_miner::local_accounts::LocalAccounts;
use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy}; use ethcore_miner::pool::{self, TransactionQueue, VerifiedTransaction, QueueStatus, PrioritizationStrategy, TxStatus};
use ethcore_miner::service_transaction_checker::ServiceTransactionChecker; use ethcore_miner::service_transaction_checker::ServiceTransactionChecker;
#[cfg(feature = "work-notify")] #[cfg(feature = "work-notify")]
use ethcore_miner::work_notify::NotifyWork; use ethcore_miner::work_notify::NotifyWork;
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use futures::sync::mpsc;
use io::IoChannel; use io::IoChannel;
use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache}; use miner::pool_client::{PoolClient, CachedNonceClient, NonceCache};
use miner; use miner;
@ -263,6 +264,13 @@ impl Miner {
self.transaction_queue.add_listener(f); self.transaction_queue.add_listener(f);
} }
/// Set a callback to be notified
pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
let (sender, receiver) = mpsc::unbounded();
self.transaction_queue.add_tx_pool_listener(sender);
receiver
}
/// Creates new instance of miner Arc. /// Creates new instance of miner Arc.
pub fn new<A: LocalAccounts + 'static>( pub fn new<A: LocalAccounts + 'static>(
options: MinerOptions, options: MinerOptions,

View File

@ -31,6 +31,9 @@ parity-runtime = { path = "../util/runtime" }
parking_lot = "0.7" parking_lot = "0.7"
price-info = { path = "./price-info", optional = true } price-info = { path = "./price-info", optional = true }
rlp = { version = "0.3.0", features = ["ethereum"] } rlp = { version = "0.3.0", features = ["ethereum"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
trace-time = "0.1" trace-time = "0.1"
transaction-pool = "2.0" transaction-pool = "2.0"

View File

@ -34,6 +34,7 @@ extern crate parking_lot;
extern crate price_info; extern crate price_info;
extern crate rlp; extern crate rlp;
extern crate transaction_pool as txpool; extern crate transaction_pool as txpool;
extern crate serde;
#[macro_use] #[macro_use]
extern crate ethabi_contract; extern crate ethabi_contract;
@ -44,6 +45,8 @@ extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate trace_time; extern crate trace_time;
#[cfg(test)] #[cfg(test)]

View File

@ -20,9 +20,11 @@ use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use ethereum_types::H256; use ethereum_types::H256;
use futures::sync::mpsc;
use txpool::{self, VerifiedTransaction}; use txpool::{self, VerifiedTransaction};
use pool::VerifiedTransaction as Transaction; use pool::VerifiedTransaction as Transaction;
use pool::TxStatus;
type Listener = Box<Fn(&[H256]) + Send + Sync>; type Listener = Box<Fn(&[H256]) + Send + Sync>;
@ -116,6 +118,65 @@ impl txpool::Listener<Transaction> for Logger {
} }
} }
/// Transactions pool notifier
#[derive(Default)]
pub struct TransactionsPoolNotifier {
listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
tx_statuses: Vec<(H256, TxStatus)>,
}
impl TransactionsPoolNotifier {
/// Add new listener to receive notifications.
pub fn add(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
self.listeners.push(f);
}
/// Notify listeners about all currently transactions.
pub fn notify(&mut self) {
if self.tx_statuses.is_empty() {
return;
}
let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
self.listeners
.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
}
}
impl fmt::Debug for TransactionsPoolNotifier {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TransactionsPoolNotifier")
.field("listeners", &self.listeners.len())
.finish()
}
}
impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Added));
}
fn rejected<H: fmt::Debug + fmt::LowerHex>(&mut self, tx: &Arc<Transaction>, _reason: &txpool::Error<H>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Rejected));
}
fn dropped(&mut self, tx: &Arc<Transaction>, _new: Option<&Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Dropped));
}
fn invalid(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Invalid));
}
fn canceled(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Canceled));
}
fn culled(&mut self, tx: &Arc<Transaction>) {
self.tx_statuses.push((tx.hash.clone(), TxStatus::Culled));
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -199,3 +199,21 @@ impl ScoredTransaction for VerifiedTransaction {
self.transaction.nonce self.transaction.nonce
} }
} }
/// Pool transactions status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TxStatus {
/// Added transaction
Added,
/// Rejected transaction
Rejected,
/// Dropped transaction
Dropped,
/// Invalid transaction
Invalid,
/// Canceled transaction
Canceled,
/// Culled transaction
Culled,
}

View File

@ -22,17 +22,18 @@ use std::sync::atomic::{self, AtomicUsize};
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use ethereum_types::{H256, U256, Address}; use ethereum_types::{H256, U256, Address};
use futures::sync::mpsc;
use parking_lot::RwLock; use parking_lot::RwLock;
use txpool::{self, Verifier}; use txpool::{self, Verifier};
use types::transaction; use types::transaction;
use pool::{ use pool::{
self, replace, scoring, verifier, client, ready, listener, self, replace, scoring, verifier, client, ready, listener,
PrioritizationStrategy, PendingOrdering, PendingSettings, PrioritizationStrategy, PendingOrdering, PendingSettings, TxStatus
}; };
use pool::local_transactions::LocalTransactionsList; use pool::local_transactions::LocalTransactionsList;
type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier)));
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>; type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;
/// Max cache time in milliseconds for pending transactions. /// Max cache time in milliseconds for pending transactions.
@ -304,6 +305,8 @@ impl TransactionQueue {
// Notify about imported transactions. // Notify about imported transactions.
(self.pool.write().listener_mut().1).0.notify(); (self.pool.write().listener_mut().1).0.notify();
((self.pool.write().listener_mut().1).1).1.notify();
if results.iter().any(|r| r.is_ok()) { if results.iter().any(|r| r.is_ok()) {
self.cached_pending.write().clear(); self.cached_pending.write().clear();
} }
@ -574,6 +577,12 @@ impl TransactionQueue {
(pool.listener_mut().1).0.add(f); (pool.listener_mut().1).0.add(f);
} }
/// Add a listener to be notified about all transactions the pool
pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
let mut pool = self.pool.write();
((pool.listener_mut().1).1).1.add(f);
}
/// Check if pending set is cached. /// Check if pending set is cached.
#[cfg(test)] #[cfg(test)]
pub fn is_pending_cached(&self) -> bool { pub fn is_pending_cached(&self) -> bool {

View File

@ -498,7 +498,7 @@ usage! {
"--jsonrpc-interface=[IP]", "--jsonrpc-interface=[IP]",
"Specify the hostname portion of the HTTP JSON-RPC API server, IP should be an interface's IP address, or all (all interfaces) or local.", "Specify the hostname portion of the HTTP JSON-RPC API server, IP should be an interface's IP address, or all (all interfaces) or local.",
ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), ARG arg_jsonrpc_apis: (String) = "web3,eth,pubsub,net,parity,private,parity_pubsub,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.rpc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--jsonrpc-apis=[APIS]", "--jsonrpc-apis=[APIS]",
"Specify the APIs available through the HTTP JSON-RPC interface using a comma-delimited list of API names. Possible names are: all, safe, debug, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", "Specify the APIs available through the HTTP JSON-RPC interface using a comma-delimited list of API names. Possible names are: all, safe, debug, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",
@ -539,7 +539,7 @@ usage! {
"--ws-interface=[IP]", "--ws-interface=[IP]",
"Specify the hostname portion of the WebSockets JSON-RPC server, IP should be an interface's IP address, or all (all interfaces) or local.", "Specify the hostname portion of the WebSockets JSON-RPC server, IP should be an interface's IP address, or all (all interfaces) or local.",
ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), ARG arg_ws_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.websockets.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--ws-apis=[APIS]", "--ws-apis=[APIS]",
"Specify the JSON-RPC APIs available through the WebSockets interface using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", "Specify the JSON-RPC APIs available through the WebSockets interface using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",
@ -564,7 +564,7 @@ usage! {
"--ipc-path=[PATH]", "--ipc-path=[PATH]",
"Specify custom path for JSON-RPC over IPC service.", "Specify custom path for JSON-RPC over IPC service.",
ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")), ARG arg_ipc_apis: (String) = "web3,eth,pubsub,net,parity,parity_pubsub,parity_accounts,private,traces,rpc,shh,shh_pubsub,parity_transactions_pool", or |c: &Config| c.ipc.as_ref()?.apis.as_ref().map(|vec| vec.join(",")),
"--ipc-apis=[APIS]", "--ipc-apis=[APIS]",
"Specify custom API set available via JSON-RPC over IPC using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub", "Specify custom API set available via JSON-RPC over IPC using a comma-delimited list of API names. Possible names are: all, safe, web3, net, eth, pubsub, personal, signer, parity, parity_pubsub, parity_accounts, parity_set, traces, rpc, secretstore, shh, shh_pubsub. You can also disable a specific API by putting '-' in the front, example: all,-personal. 'safe' enables the following APIs: web3, net, eth, pubsub, parity, parity_pubsub, traces, rpc, shh, shh_pubsub",

View File

@ -38,6 +38,7 @@ use miner::external::ExternalMiner;
use parity_rpc::dispatch::{FullDispatcher, LightDispatcher}; use parity_rpc::dispatch::{FullDispatcher, LightDispatcher};
use parity_rpc::informant::{ActivityNotifier, ClientNotifier}; use parity_rpc::informant::{ActivityNotifier, ClientNotifier};
use parity_rpc::{Host, Metadata, NetworkSettings}; use parity_rpc::{Host, Metadata, NetworkSettings};
use parity_rpc::v1::traits::TransactionsPool;
use parity_runtime::Executor; use parity_runtime::Executor;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use sync::{LightSync, ManageNetwork, SyncProvider}; use sync::{LightSync, ManageNetwork, SyncProvider};
@ -82,6 +83,8 @@ pub enum Api {
/// Geth-compatible (best-effort) debug API (Potentially UNSAFE) /// Geth-compatible (best-effort) debug API (Potentially UNSAFE)
/// NOTE We don't aim to support all methods, only the ones that are useful. /// NOTE We don't aim to support all methods, only the ones that are useful.
Debug, Debug,
/// Parity Transactions pool PubSub
ParityTransactionsPool,
} }
impl FromStr for Api { impl FromStr for Api {
@ -108,6 +111,7 @@ impl FromStr for Api {
"signer" => Ok(Signer), "signer" => Ok(Signer),
"traces" => Ok(Traces), "traces" => Ok(Traces),
"web3" => Ok(Web3), "web3" => Ok(Web3),
"parity_transactions_pool" => Ok(ParityTransactionsPool),
api => Err(format!("Unknown api: {}", api)), api => Err(format!("Unknown api: {}", api)),
} }
} }
@ -191,6 +195,7 @@ fn to_modules(apis: &HashSet<Api>) -> BTreeMap<String, String> {
Api::Web3 => ("web3", "1.0"), Api::Web3 => ("web3", "1.0"),
Api::Whisper => ("shh", "1.0"), Api::Whisper => ("shh", "1.0"),
Api::WhisperPubSub => ("shh_pubsub", "1.0"), Api::WhisperPubSub => ("shh_pubsub", "1.0"),
Api::ParityTransactionsPool => ("parity_transactions_pool", "1.0"),
}; };
modules.insert(name.into(), version.into()); modules.insert(name.into(), version.into());
} }
@ -352,6 +357,13 @@ impl FullDependencies {
handler.extend_with(client.to_delegate()); handler.extend_with(client.to_delegate());
} }
} }
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.miner.tx_pool_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
}
Api::Personal => { Api::Personal => {
#[cfg(feature = "accounts")] #[cfg(feature = "accounts")]
handler.extend_with( handler.extend_with(
@ -603,6 +615,13 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
})); }));
handler.extend_with(EthPubSub::to_delegate(client)); handler.extend_with(EthPubSub::to_delegate(client));
} }
Api::ParityTransactionsPool => {
if !for_generic_pubsub {
let receiver = self.transaction_queue.write().tx_statuses_receiver();
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
handler.extend_with(TransactionsPoolClient::to_delegate(client));
}
}
Api::Personal => { Api::Personal => {
#[cfg(feature = "accounts")] #[cfg(feature = "accounts")]
handler.extend_with( handler.extend_with(
@ -744,12 +763,14 @@ impl ApiSet {
ApiSet::UnsafeContext => { ApiSet::UnsafeContext => {
public_list.insert(Api::Traces); public_list.insert(Api::Traces);
public_list.insert(Api::ParityPubSub); public_list.insert(Api::ParityPubSub);
public_list.insert(Api::ParityTransactionsPool);
public_list public_list
} }
ApiSet::IpcContext => { ApiSet::IpcContext => {
public_list.insert(Api::Traces); public_list.insert(Api::Traces);
public_list.insert(Api::ParityPubSub); public_list.insert(Api::ParityPubSub);
public_list.insert(Api::ParityAccounts); public_list.insert(Api::ParityAccounts);
public_list.insert(Api::ParityTransactionsPool);
public_list public_list
} }
ApiSet::All => { ApiSet::All => {
@ -761,6 +782,7 @@ impl ApiSet {
public_list.insert(Api::Signer); public_list.insert(Api::Signer);
public_list.insert(Api::Personal); public_list.insert(Api::Personal);
public_list.insert(Api::SecretStore); public_list.insert(Api::SecretStore);
public_list.insert(Api::ParityTransactionsPool);
public_list public_list
} }
ApiSet::PubSub => [ ApiSet::PubSub => [
@ -769,6 +791,7 @@ impl ApiSet {
Api::ParityAccounts, Api::ParityAccounts,
Api::ParitySet, Api::ParitySet,
Api::Traces, Api::Traces,
Api::ParityTransactionsPool,
] ]
.into_iter() .into_iter()
.cloned() .cloned()
@ -799,6 +822,7 @@ mod test {
assert_eq!(Api::Private, "private".parse().unwrap()); assert_eq!(Api::Private, "private".parse().unwrap());
assert_eq!(Api::Whisper, "shh".parse().unwrap()); assert_eq!(Api::Whisper, "shh".parse().unwrap());
assert_eq!(Api::WhisperPubSub, "shh_pubsub".parse().unwrap()); assert_eq!(Api::WhisperPubSub, "shh_pubsub".parse().unwrap());
assert_eq!(Api::ParityTransactionsPool, "parity_transactions_pool".parse().unwrap());
assert!("rp".parse::<Api>().is_err()); assert!("rp".parse::<Api>().is_err());
} }
@ -830,6 +854,7 @@ mod test {
Api::Whisper, Api::Whisper,
Api::WhisperPubSub, Api::WhisperPubSub,
Api::Private, Api::Private,
Api::ParityTransactionsPool,
].into_iter() ].into_iter()
.collect(); .collect();
assert_eq!(ApiSet::UnsafeContext.list_apis(), expected); assert_eq!(ApiSet::UnsafeContext.list_apis(), expected);
@ -850,6 +875,7 @@ mod test {
Api::Whisper, Api::Whisper,
Api::WhisperPubSub, Api::WhisperPubSub,
Api::Private, Api::Private,
Api::ParityTransactionsPool,
// semi-safe // semi-safe
Api::ParityAccounts, Api::ParityAccounts,
].into_iter() ].into_iter()
@ -880,6 +906,7 @@ mod test {
Api::Personal, Api::Personal,
Api::Private, Api::Private,
Api::Debug, Api::Debug,
Api::ParityTransactionsPool,
].into_iter() ].into_iter()
.collect() .collect()
) )
@ -908,6 +935,7 @@ mod test {
Api::Signer, Api::Signer,
Api::Private, Api::Private,
Api::Debug, Api::Debug,
Api::ParityTransactionsPool,
].into_iter() ].into_iter()
.collect() .collect()
) )
@ -931,6 +959,7 @@ mod test {
Api::Whisper, Api::Whisper,
Api::WhisperPubSub, Api::WhisperPubSub,
Api::Private, Api::Private,
Api::ParityTransactionsPool,
].into_iter() ].into_iter()
.collect() .collect()
) )

View File

@ -36,6 +36,7 @@ mod signer;
mod signing; mod signing;
mod signing_unsafe; mod signing_unsafe;
mod traces; mod traces;
mod transactions_pool;
mod web3; mod web3;
pub mod light; pub mod light;
@ -44,6 +45,7 @@ pub use self::debug::DebugClient;
pub use self::eth::{EthClient, EthClientOptions}; pub use self::eth::{EthClient, EthClientOptions};
pub use self::eth_filter::EthFilterClient; pub use self::eth_filter::EthFilterClient;
pub use self::eth_pubsub::EthPubSubClient; pub use self::eth_pubsub::EthPubSubClient;
pub use self::transactions_pool::TransactionsPoolClient;
pub use self::net::NetClient; pub use self::net::NetClient;
pub use self::parity::ParityClient; pub use self::parity::ParityClient;
#[cfg(any(test, feature = "accounts"))] #[cfg(any(test, feature = "accounts"))]

View File

@ -0,0 +1,101 @@
use std::sync::{Arc, Weak};
use jsonrpc_core::Result;
use jsonrpc_core::futures::Future;
use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}};
use v1::helpers::Subscribers;
use v1::metadata::Metadata;
use v1::traits::TransactionsPool;
use miner::pool::TxStatus;
use parity_runtime::Executor;
use parking_lot::RwLock;
use ethereum_types::H256;
use futures::{Stream, sync::mpsc};
type Client = Sink<(H256, TxStatus)>;
/// Transactions pool PubSub implementation.
pub struct TransactionsPoolClient {
handler: Arc<TransactionsNotificationHandler>,
transactions_pool_subscribers: Arc<RwLock<Subscribers<Client>>>,
}
impl TransactionsPoolClient {
/// Creates new `TransactionsPoolClient`.
pub fn new(executor: Executor, pool_receiver: mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>>) -> Self {
let transactions_pool_subscribers = Arc::new(RwLock::new(Subscribers::default()));
let handler = Arc::new(
TransactionsNotificationHandler::new(
executor.clone(),
transactions_pool_subscribers.clone(),
)
);
let handler2 = Arc::downgrade(&handler);
executor.spawn(pool_receiver
.for_each(move |tx_status| {
if let Some(handler2) = handler2.upgrade() {
handler2.notify_transaction(tx_status);
}
Ok(())
})
.map_err(|e| warn!("Key server listener error: {:?}", e))
);
TransactionsPoolClient {
handler,
transactions_pool_subscribers,
}
}
/// Returns a chain notification handler.
pub fn handler(&self) -> Weak<TransactionsNotificationHandler> {
Arc::downgrade(&self.handler)
}
}
/// Transactions pool PubSub Notification handler.
pub struct TransactionsNotificationHandler {
executor: Executor,
transactions_pool_subscribers: Arc<RwLock<Subscribers<Client>>>,
}
impl TransactionsNotificationHandler {
fn new(executor: Executor, transactions_pool_subscribers: Arc<RwLock<Subscribers<Client>>>) -> Self {
TransactionsNotificationHandler {
executor,
transactions_pool_subscribers,
}
}
fn notify(executor: &Executor, subscriber: &Client, result: (H256, TxStatus)) {
executor.spawn(subscriber
.notify(Ok(result))
.map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
);
}
pub fn notify_transaction(&self, tx_statuses: Arc<Vec<(H256, TxStatus)>>) {
for subscriber in self.transactions_pool_subscribers.read().values() {
for tx_status in tx_statuses.to_vec() {
Self::notify(&self.executor, subscriber, tx_status.clone());
}
}
}
}
impl TransactionsPool for TransactionsPoolClient {
type Metadata = Metadata;
fn subscribe(&self, _meta: Metadata, subscriber: Subscriber<(H256, TxStatus)>) {
self.transactions_pool_subscribers.write().push(subscriber);
}
fn unsubscribe(&self, _meta: Option<Metadata>, id: SubscriptionId) -> Result<bool> {
let res = self.transactions_pool_subscribers.write().remove(&id).is_some();
Ok(res)
}
}

View File

@ -32,6 +32,7 @@ pub mod rpc;
pub mod secretstore; pub mod secretstore;
pub mod signer; pub mod signer;
pub mod traces; pub mod traces;
pub mod transactions_pool;
pub mod web3; pub mod web3;
pub use self::debug::Debug; pub use self::debug::Debug;
@ -50,4 +51,5 @@ pub use self::rpc::Rpc;
pub use self::secretstore::SecretStore; pub use self::secretstore::SecretStore;
pub use self::signer::Signer; pub use self::signer::Signer;
pub use self::traces::Traces; pub use self::traces::Traces;
pub use self::transactions_pool::TransactionsPool;
pub use self::web3::Web3; pub use self::web3::Web3;

View File

@ -0,0 +1,23 @@
//! Transactions pool PUB-SUB rpc interface.
use jsonrpc_core::Result;
use jsonrpc_pubsub::{typed, SubscriptionId};
use jsonrpc_derive::rpc;
use miner::pool::TxStatus;
use ethereum_types::H256;
/// Transactions Pool PUB-SUB rpc interface.
#[rpc]
pub trait TransactionsPool {
/// Pub/Sub Metadata
type Metadata;
/// Subscribe to Transactions Pool subscription.
#[pubsub(subscription = "parity_watchTransactionsPool", subscribe, name = "parity_watchTransactionsPool")]
fn subscribe(&self, Self::Metadata, typed::Subscriber<(H256, TxStatus)>);
/// Unsubscribe from existing Transactions Pool subscription.
#[pubsub(subscription = "parity_watchTransactionsPool", unsubscribe, name = "parity_unwatchTransactionsPool")]
fn unsubscribe(&self, Option<Self::Metadata>, SubscriptionId) -> Result<bool>;
}