Merge Notifier
and TransactionsPoolNotifier
(#10591)
* Merge `Notifier` and `TransactionsPoolNotifier` * fix tests
This commit is contained in:
parent
425dcd45c2
commit
faf6f1f9ea
@ -129,15 +129,13 @@ pub enum ImportDestination {
|
|||||||
Future,
|
Future,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Listener = Box<Fn(&[H256]) + Send + Sync>;
|
|
||||||
|
|
||||||
/// Light transaction queue. See module docs for more details.
|
/// Light transaction queue. See module docs for more details.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct TransactionQueue {
|
pub struct TransactionQueue {
|
||||||
by_account: HashMap<Address, AccountTransactions>,
|
by_account: HashMap<Address, AccountTransactions>,
|
||||||
by_hash: H256FastMap<PendingTransaction>,
|
by_hash: H256FastMap<PendingTransaction>,
|
||||||
listeners: Vec<Listener>,
|
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
|
||||||
tx_statuses_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
|
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for TransactionQueue {
|
impl fmt::Debug for TransactionQueue {
|
||||||
@ -145,7 +143,8 @@ impl fmt::Debug for TransactionQueue {
|
|||||||
fmt.debug_struct("TransactionQueue")
|
fmt.debug_struct("TransactionQueue")
|
||||||
.field("by_account", &self.by_account)
|
.field("by_account", &self.by_account)
|
||||||
.field("by_hash", &self.by_hash)
|
.field("by_hash", &self.by_hash)
|
||||||
.field("listeners", &self.listeners.len())
|
.field("pending_listeners", &self.pending_listeners.len())
|
||||||
|
.field("full_listeners", &self.pending_listeners.len())
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -360,30 +359,40 @@ impl TransactionQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Add a transaction queue listener.
|
/// Add a transaction queue listener.
|
||||||
pub fn add_listener(&mut self, f: Listener) {
|
pub fn pending_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
|
||||||
self.listeners.push(f);
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
|
self.pending_listeners.push(sender);
|
||||||
|
receiver
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a transaction queue listener.
|
/// Add a transaction queue listener.
|
||||||
pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
|
pub fn full_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
|
||||||
let (sender, receiver) = mpsc::unbounded();
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
self.tx_statuses_listeners.push(sender);
|
self.full_listeners.push(sender);
|
||||||
receiver
|
receiver
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies all listeners about new pending transaction.
|
/// Notifies all listeners about new pending transaction.
|
||||||
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
|
fn notify(&mut self, hashes: &[H256], status: TxStatus) {
|
||||||
for listener in &self.listeners {
|
if status == TxStatus::Added {
|
||||||
listener(hashes)
|
let to_pending_send: Arc<Vec<H256>> = Arc::new(
|
||||||
}
|
|
||||||
|
|
||||||
let to_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
|
|
||||||
hashes
|
hashes
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|hash| (hash.clone(), status)).collect()
|
.map(|hash| hash.clone())
|
||||||
|
.collect()
|
||||||
|
);
|
||||||
|
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
let to_full_send: Arc<Vec<(H256, TxStatus)>> = Arc::new(
|
||||||
|
hashes
|
||||||
|
.into_iter()
|
||||||
|
.map(|hash| (hash.clone(), status))
|
||||||
|
.collect()
|
||||||
);
|
);
|
||||||
|
|
||||||
self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok());
|
self.full_listeners.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,14 +260,16 @@ impl Miner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Set a callback to be notified about imported transactions' hashes.
|
/// Set a callback to be notified about imported transactions' hashes.
|
||||||
pub fn add_transactions_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
|
pub fn pending_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<H256>>> {
|
||||||
self.transaction_queue.add_listener(f);
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
|
self.transaction_queue.add_pending_listener(sender);
|
||||||
|
receiver
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set a callback to be notified
|
/// Set a callback to be notified about imported transactions' hashes.
|
||||||
pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
|
pub fn full_transactions_receiver(&self) -> mpsc::UnboundedReceiver<Arc<Vec<(H256, TxStatus)>>> {
|
||||||
let (sender, receiver) = mpsc::unbounded();
|
let (sender, receiver) = mpsc::unbounded();
|
||||||
self.transaction_queue.add_tx_pool_listener(sender);
|
self.transaction_queue.add_full_listener(sender);
|
||||||
receiver
|
receiver
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,50 +26,6 @@ use txpool::{self, VerifiedTransaction};
|
|||||||
use pool::VerifiedTransaction as Transaction;
|
use pool::VerifiedTransaction as Transaction;
|
||||||
use pool::TxStatus;
|
use pool::TxStatus;
|
||||||
|
|
||||||
type Listener = Box<Fn(&[H256]) + Send + Sync>;
|
|
||||||
|
|
||||||
/// Manages notifications to pending transaction listeners.
|
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Notifier {
|
|
||||||
listeners: Vec<Listener>,
|
|
||||||
pending: Vec<H256>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Notifier {
|
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
fmt.debug_struct("Notifier")
|
|
||||||
.field("listeners", &self.listeners.len())
|
|
||||||
.field("pending", &self.pending)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Notifier {
|
|
||||||
/// Add new listener to receive notifications.
|
|
||||||
pub fn add(&mut self, f: Listener) {
|
|
||||||
self.listeners.push(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Notify listeners about all currently pending transactions.
|
|
||||||
pub fn notify(&mut self) {
|
|
||||||
if self.pending.is_empty() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for l in &self.listeners {
|
|
||||||
(l)(&self.pending);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.pending.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl txpool::Listener<Transaction> for Notifier {
|
|
||||||
fn added(&mut self, tx: &Arc<Transaction>, _old: Option<&Arc<Transaction>>) {
|
|
||||||
self.pending.push(*tx.hash());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Transaction pool logger.
|
/// Transaction pool logger.
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
pub struct Logger;
|
pub struct Logger;
|
||||||
@ -121,14 +77,20 @@ impl txpool::Listener<Transaction> for Logger {
|
|||||||
/// Transactions pool notifier
|
/// Transactions pool notifier
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct TransactionsPoolNotifier {
|
pub struct TransactionsPoolNotifier {
|
||||||
listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
|
full_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>>,
|
||||||
|
pending_listeners: Vec<mpsc::UnboundedSender<Arc<Vec<H256>>>>,
|
||||||
tx_statuses: Vec<(H256, TxStatus)>,
|
tx_statuses: Vec<(H256, TxStatus)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionsPoolNotifier {
|
impl TransactionsPoolNotifier {
|
||||||
/// Add new listener to receive notifications.
|
/// Add new full listener to receive notifications.
|
||||||
pub fn add(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
|
pub fn add_full_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
|
||||||
self.listeners.push(f);
|
self.full_listeners.push(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add new pending listener to receive notifications.
|
||||||
|
pub fn add_pending_listener(&mut self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
|
||||||
|
self.pending_listeners.push(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notify listeners about all currently transactions.
|
/// Notify listeners about all currently transactions.
|
||||||
@ -137,16 +99,25 @@ impl TransactionsPoolNotifier {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
|
let to_pending_send: Arc<Vec<H256>> = Arc::new(
|
||||||
self.listeners
|
self.tx_statuses.clone()
|
||||||
.retain(|listener| listener.unbounded_send(to_send.clone()).is_ok());
|
.into_iter()
|
||||||
|
.map(|(hash, _)| hash)
|
||||||
|
.collect()
|
||||||
|
);
|
||||||
|
self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok());
|
||||||
|
|
||||||
|
let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new()));
|
||||||
|
self.full_listeners
|
||||||
|
.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl fmt::Debug for TransactionsPoolNotifier {
|
impl fmt::Debug for TransactionsPoolNotifier {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
fmt.debug_struct("TransactionsPoolNotifier")
|
fmt.debug_struct("TransactionsPoolNotifier")
|
||||||
.field("listeners", &self.listeners.len())
|
.field("full_listeners", &self.full_listeners.len())
|
||||||
|
.field("pending_listeners", &self.pending_listeners.len())
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -180,33 +151,36 @@ impl txpool::Listener<Transaction> for TransactionsPoolNotifier {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use parking_lot::Mutex;
|
|
||||||
use types::transaction;
|
use types::transaction;
|
||||||
use txpool::Listener;
|
use txpool::Listener;
|
||||||
|
use futures::{Stream, Future};
|
||||||
use ethereum_types::Address;
|
use ethereum_types::Address;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_notify_listeners() {
|
fn should_notify_listeners() {
|
||||||
// given
|
// given
|
||||||
let received = Arc::new(Mutex::new(vec![]));
|
let (full_sender, full_receiver) = mpsc::unbounded();
|
||||||
let r = received.clone();
|
let (pending_sender, pending_receiver) = mpsc::unbounded();
|
||||||
let listener = Box::new(move |hashes: &[H256]| {
|
|
||||||
*r.lock() = hashes.iter().map(|x| *x).collect();
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut tx_listener = Notifier::default();
|
let mut tx_listener = TransactionsPoolNotifier::default();
|
||||||
tx_listener.add(listener);
|
tx_listener.add_full_listener(full_sender);
|
||||||
|
tx_listener.add_pending_listener(pending_sender);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
let tx = new_tx();
|
let tx = new_tx();
|
||||||
tx_listener.added(&tx, None);
|
tx_listener.added(&tx, None);
|
||||||
assert_eq!(*received.lock(), vec![]);
|
|
||||||
|
|
||||||
// then
|
// then
|
||||||
tx_listener.notify();
|
tx_listener.notify();
|
||||||
|
let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap();
|
||||||
|
let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
*received.lock(),
|
full_res,
|
||||||
vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()]
|
Some(Arc::new(vec![(serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)]))
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
pending_res,
|
||||||
|
Some(Arc::new(vec![serde_json::from_str::<H256>("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()]))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ use pool::{
|
|||||||
};
|
};
|
||||||
use pool::local_transactions::LocalTransactionsList;
|
use pool::local_transactions::LocalTransactionsList;
|
||||||
|
|
||||||
type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier)));
|
type Listener = (LocalTransactionsList, (listener::TransactionsPoolNotifier, listener::Logger));
|
||||||
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;
|
type Pool = txpool::Pool<pool::VerifiedTransaction, scoring::NonceAndGasPrice, Listener>;
|
||||||
|
|
||||||
/// Max cache time in milliseconds for pending transactions.
|
/// Max cache time in milliseconds for pending transactions.
|
||||||
@ -305,8 +305,6 @@ impl TransactionQueue {
|
|||||||
// Notify about imported transactions.
|
// Notify about imported transactions.
|
||||||
(self.pool.write().listener_mut().1).0.notify();
|
(self.pool.write().listener_mut().1).0.notify();
|
||||||
|
|
||||||
((self.pool.write().listener_mut().1).1).1.notify();
|
|
||||||
|
|
||||||
if results.iter().any(|r| r.is_ok()) {
|
if results.iter().any(|r| r.is_ok()) {
|
||||||
self.cached_pending.write().clear();
|
self.cached_pending.write().clear();
|
||||||
}
|
}
|
||||||
@ -571,16 +569,16 @@ impl TransactionQueue {
|
|||||||
self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect()
|
self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a callback to be notified about all transactions entering the pool.
|
/// Add a listener to be notified about all transactions the pool
|
||||||
pub fn add_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
|
pub fn add_pending_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<H256>>>) {
|
||||||
let mut pool = self.pool.write();
|
let mut pool = self.pool.write();
|
||||||
(pool.listener_mut().1).0.add(f);
|
(pool.listener_mut().1).0.add_pending_listener(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a listener to be notified about all transactions the pool
|
/// Add a listener to be notified about all transactions the pool
|
||||||
pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
|
pub fn add_full_listener(&self, f: mpsc::UnboundedSender<Arc<Vec<(H256, TxStatus)>>>) {
|
||||||
let mut pool = self.pool.write();
|
let mut pool = self.pool.write();
|
||||||
((pool.listener_mut().1).1).1.add(f);
|
(pool.listener_mut().1).0.add_full_listener(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if pending set is cached.
|
/// Check if pending set is cached.
|
||||||
|
@ -329,8 +329,9 @@ impl FullDependencies {
|
|||||||
}
|
}
|
||||||
Api::EthPubSub => {
|
Api::EthPubSub => {
|
||||||
if !for_generic_pubsub {
|
if !for_generic_pubsub {
|
||||||
|
let pool_receiver = self.miner.pending_transactions_receiver();
|
||||||
let mut client =
|
let mut client =
|
||||||
EthPubSubClient::new(self.client.clone(), self.executor.clone());
|
EthPubSubClient::new(self.client.clone(), self.executor.clone(), pool_receiver);
|
||||||
let weak_client = Arc::downgrade(&self.client);
|
let weak_client = Arc::downgrade(&self.client);
|
||||||
|
|
||||||
client.add_sync_notifier(self.sync.sync_notification(), move |state| {
|
client.add_sync_notifier(self.sync.sync_notification(), move |state| {
|
||||||
@ -345,14 +346,6 @@ impl FullDependencies {
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
let h = client.handler();
|
|
||||||
self.miner
|
|
||||||
.add_transactions_listener(Box::new(move |hashes| {
|
|
||||||
if let Some(h) = h.upgrade() {
|
|
||||||
h.notify_new_transactions(hashes);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
if let Some(h) = client.handler().upgrade() {
|
if let Some(h) = client.handler().upgrade() {
|
||||||
self.client.add_notify(h);
|
self.client.add_notify(h);
|
||||||
}
|
}
|
||||||
@ -361,7 +354,7 @@ impl FullDependencies {
|
|||||||
}
|
}
|
||||||
Api::ParityTransactionsPool => {
|
Api::ParityTransactionsPool => {
|
||||||
if !for_generic_pubsub {
|
if !for_generic_pubsub {
|
||||||
let receiver = self.miner.tx_pool_receiver();
|
let receiver = self.miner.full_transactions_receiver();
|
||||||
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
|
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
|
||||||
handler.extend_with(TransactionsPoolClient::to_delegate(client));
|
handler.extend_with(TransactionsPoolClient::to_delegate(client));
|
||||||
}
|
}
|
||||||
@ -583,6 +576,8 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Api::EthPubSub => {
|
Api::EthPubSub => {
|
||||||
|
let receiver = self.transaction_queue.write().pending_transactions_receiver();
|
||||||
|
|
||||||
let mut client = EthPubSubClient::light(
|
let mut client = EthPubSubClient::light(
|
||||||
self.client.clone(),
|
self.client.clone(),
|
||||||
self.on_demand.clone(),
|
self.on_demand.clone(),
|
||||||
@ -590,6 +585,7 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
|
|||||||
self.cache.clone(),
|
self.cache.clone(),
|
||||||
self.executor.clone(),
|
self.executor.clone(),
|
||||||
self.gas_price_percentile,
|
self.gas_price_percentile,
|
||||||
|
receiver
|
||||||
);
|
);
|
||||||
|
|
||||||
let weak_client = Arc::downgrade(&self.client);
|
let weak_client = Arc::downgrade(&self.client);
|
||||||
@ -607,19 +603,11 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
self.client.add_listener(client.handler() as Weak<_>);
|
self.client.add_listener(client.handler() as Weak<_>);
|
||||||
let h = client.handler();
|
|
||||||
self.transaction_queue
|
|
||||||
.write()
|
|
||||||
.add_listener(Box::new(move |transactions| {
|
|
||||||
if let Some(h) = h.upgrade() {
|
|
||||||
h.notify_new_transactions(transactions);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
handler.extend_with(EthPubSub::to_delegate(client));
|
handler.extend_with(EthPubSub::to_delegate(client));
|
||||||
}
|
}
|
||||||
Api::ParityTransactionsPool => {
|
Api::ParityTransactionsPool => {
|
||||||
if !for_generic_pubsub {
|
if !for_generic_pubsub {
|
||||||
let receiver = self.transaction_queue.write().tx_statuses_receiver();
|
let receiver = self.transaction_queue.write().full_transactions_receiver();
|
||||||
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
|
let client = TransactionsPoolClient::new(self.executor.clone(), receiver);
|
||||||
handler.extend_with(TransactionsPoolClient::to_delegate(client));
|
handler.extend_with(TransactionsPoolClient::to_delegate(client));
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ use ethcore::verification::queue::VerifierSettings;
|
|||||||
use ethcore_logger::{Config as LogConfig, RotatingLogger};
|
use ethcore_logger::{Config as LogConfig, RotatingLogger};
|
||||||
use ethcore_service::ClientService;
|
use ethcore_service::ClientService;
|
||||||
use ethereum_types::Address;
|
use ethereum_types::Address;
|
||||||
use futures::IntoFuture;
|
use futures::{IntoFuture, Stream};
|
||||||
use hash_fetch::{self, fetch};
|
use hash_fetch::{self, fetch};
|
||||||
use informant::{Informant, LightNodeInformantData, FullNodeInformantData};
|
use informant::{Informant, LightNodeInformantData, FullNodeInformantData};
|
||||||
use journaldb::Algorithm;
|
use journaldb::Algorithm;
|
||||||
@ -668,14 +668,19 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
|
|||||||
// Propagate transactions as soon as they are imported.
|
// Propagate transactions as soon as they are imported.
|
||||||
let tx = ::parking_lot::Mutex::new(priority_tasks);
|
let tx = ::parking_lot::Mutex::new(priority_tasks);
|
||||||
let is_ready = Arc::new(atomic::AtomicBool::new(true));
|
let is_ready = Arc::new(atomic::AtomicBool::new(true));
|
||||||
miner.add_transactions_listener(Box::new(move |_hashes| {
|
let executor = runtime.executor();
|
||||||
|
let pool_receiver = miner.pending_transactions_receiver();
|
||||||
|
executor.spawn(
|
||||||
|
pool_receiver.for_each(move |_hashes| {
|
||||||
// we want to have only one PendingTransactions task in the queue.
|
// we want to have only one PendingTransactions task in the queue.
|
||||||
if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) {
|
if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) {
|
||||||
let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone());
|
let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone());
|
||||||
// we ignore error cause it means that we are closing
|
// we ignore error cause it means that we are closing
|
||||||
let _ = tx.lock().send(task);
|
let _ = tx.lock().send(task);
|
||||||
}
|
}
|
||||||
}));
|
Ok(())
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
// provider not added to a notification center is effectively disabled
|
// provider not added to a notification center is effectively disabled
|
||||||
// TODO [debris] refactor it later on
|
// TODO [debris] refactor it later on
|
||||||
|
@ -20,7 +20,7 @@ use std::sync::{Arc, Weak};
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use jsonrpc_core::{BoxFuture, Result, Error};
|
use jsonrpc_core::{BoxFuture, Result, Error};
|
||||||
use jsonrpc_core::futures::{self, Future, IntoFuture, Stream};
|
use jsonrpc_core::futures::{self, Future, IntoFuture, Stream, sync::mpsc};
|
||||||
use jsonrpc_pubsub::typed::{Sink, Subscriber};
|
use jsonrpc_pubsub::typed::{Sink, Subscriber};
|
||||||
use jsonrpc_pubsub::SubscriptionId;
|
use jsonrpc_pubsub::SubscriptionId;
|
||||||
|
|
||||||
@ -80,23 +80,39 @@ impl<C> EthPubSubClient<C>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> EthPubSubClient<C> {
|
impl<C> EthPubSubClient<C>
|
||||||
|
where
|
||||||
|
C: 'static + Send + Sync {
|
||||||
|
|
||||||
/// Creates new `EthPubSubClient`.
|
/// Creates new `EthPubSubClient`.
|
||||||
pub fn new(client: Arc<C>, executor: Executor) -> Self {
|
pub fn new(client: Arc<C>, executor: Executor, pool_receiver: mpsc::UnboundedReceiver<Arc<Vec<H256>>>) -> Self {
|
||||||
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
let sync_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let sync_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
|
|
||||||
EthPubSubClient {
|
let handler = Arc::new(ChainNotificationHandler {
|
||||||
handler: Arc::new(ChainNotificationHandler {
|
|
||||||
client,
|
client,
|
||||||
executor,
|
executor,
|
||||||
heads_subscribers: heads_subscribers.clone(),
|
heads_subscribers: heads_subscribers.clone(),
|
||||||
logs_subscribers: logs_subscribers.clone(),
|
logs_subscribers: logs_subscribers.clone(),
|
||||||
transactions_subscribers: transactions_subscribers.clone(),
|
transactions_subscribers: transactions_subscribers.clone(),
|
||||||
sync_subscribers: sync_subscribers.clone(),
|
sync_subscribers: sync_subscribers.clone(),
|
||||||
}),
|
});
|
||||||
|
let handler2 = Arc::downgrade(&handler);
|
||||||
|
|
||||||
|
handler.executor.spawn(pool_receiver
|
||||||
|
.for_each(move |hashes| {
|
||||||
|
if let Some(handler2) = handler2.upgrade() {
|
||||||
|
handler2.notify_new_transactions(&hashes.to_vec());
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
Err(())
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
EthPubSubClient {
|
||||||
|
handler,
|
||||||
sync_subscribers,
|
sync_subscribers,
|
||||||
heads_subscribers,
|
heads_subscribers,
|
||||||
logs_subscribers,
|
logs_subscribers,
|
||||||
@ -123,6 +139,7 @@ where
|
|||||||
cache: Arc<Mutex<Cache>>,
|
cache: Arc<Mutex<Cache>>,
|
||||||
executor: Executor,
|
executor: Executor,
|
||||||
gas_price_percentile: usize,
|
gas_price_percentile: usize,
|
||||||
|
pool_receiver: mpsc::UnboundedReceiver<Arc<Vec<H256>>>
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let fetch = LightFetch {
|
let fetch = LightFetch {
|
||||||
client,
|
client,
|
||||||
@ -131,7 +148,7 @@ where
|
|||||||
cache,
|
cache,
|
||||||
gas_price_percentile,
|
gas_price_percentile,
|
||||||
};
|
};
|
||||||
EthPubSubClient::new(Arc::new(fetch), executor)
|
EthPubSubClient::new(Arc::new(fetch), executor, pool_receiver)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,7 +222,7 @@ impl<C> ChainNotificationHandler<C> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Notify all subscribers about new transaction hashes.
|
/// Notify all subscribers about new transaction hashes.
|
||||||
pub fn notify_new_transactions(&self, hashes: &[H256]) {
|
fn notify_new_transactions(&self, hashes: &[H256]) {
|
||||||
for subscriber in self.transactions_subscribers.read().values() {
|
for subscriber in self.transactions_subscribers.read().values() {
|
||||||
for hash in hashes {
|
for hash in hashes {
|
||||||
Self::notify(&self.executor, subscriber, pubsub::Result::TransactionHash(*hash));
|
Self::notify(&self.executor, subscriber, pubsub::Result::TransactionHash(*hash));
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use jsonrpc_core::MetaIoHandler;
|
use jsonrpc_core::MetaIoHandler;
|
||||||
use jsonrpc_core::futures::{self, Stream, Future};
|
use jsonrpc_core::futures::{self, Stream, Future, sync::mpsc};
|
||||||
use jsonrpc_pubsub::Session;
|
use jsonrpc_pubsub::Session;
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -40,7 +40,9 @@ fn should_subscribe_to_new_heads() {
|
|||||||
let h2 = client.block_hash_delta_minus(2);
|
let h2 = client.block_hash_delta_minus(2);
|
||||||
let h1 = client.block_hash_delta_minus(3);
|
let h1 = client.block_hash_delta_minus(3);
|
||||||
|
|
||||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
|
let (_, pool_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
|
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
|
||||||
let handler = pubsub.handler().upgrade().unwrap();
|
let handler = pubsub.handler().upgrade().unwrap();
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
@ -112,7 +114,9 @@ fn should_subscribe_to_logs() {
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
|
let (_, pool_receiver) = mpsc::unbounded();
|
||||||
|
|
||||||
|
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
|
||||||
let handler = pubsub.handler().upgrade().unwrap();
|
let handler = pubsub.handler().upgrade().unwrap();
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
@ -159,8 +163,9 @@ fn should_subscribe_to_pending_transactions() {
|
|||||||
let el = Runtime::with_thread_count(1);
|
let el = Runtime::with_thread_count(1);
|
||||||
let client = TestBlockChainClient::new();
|
let client = TestBlockChainClient::new();
|
||||||
|
|
||||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
|
let (pool_sender, pool_receiver) = mpsc::unbounded();
|
||||||
let handler = pubsub.handler().upgrade().unwrap();
|
|
||||||
|
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
@ -181,7 +186,7 @@ fn should_subscribe_to_pending_transactions() {
|
|||||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||||
|
|
||||||
// Send new transactions
|
// Send new transactions
|
||||||
handler.notify_new_transactions(&[H256::from_low_u64_be(5), H256::from_low_u64_be(7)]);
|
pool_sender.unbounded_send(Arc::new(vec![H256::from_low_u64_be(5), H256::from_low_u64_be(7)])).unwrap();
|
||||||
|
|
||||||
let (res, receiver) = receiver.into_future().wait().unwrap();
|
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||||
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x43ca64edf03768e1"}}"#;
|
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x43ca64edf03768e1"}}"#;
|
||||||
@ -205,7 +210,8 @@ fn eth_subscribe_syncing() {
|
|||||||
// given
|
// given
|
||||||
let el = Runtime::with_thread_count(1);
|
let el = Runtime::with_thread_count(1);
|
||||||
let client = TestBlockChainClient::new();
|
let client = TestBlockChainClient::new();
|
||||||
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor());
|
let (_, pool_receiver) = mpsc::unbounded();
|
||||||
|
let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver);
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
|
Loading…
Reference in New Issue
Block a user