From faf6f1f9eac859b9e8a0c1f9e2b6dd2cc5490b34 Mon Sep 17 00:00:00 2001 From: Sfxdx <35330335+IntegralTeam@users.noreply.github.com> Date: Tue, 4 Jun 2019 18:35:33 +0700 Subject: [PATCH] Merge `Notifier` and `TransactionsPoolNotifier` (#10591) * Merge `Notifier` and `TransactionsPoolNotifier` * fix tests --- ethcore/light/src/transaction_queue.rs | 37 +++++---- ethcore/src/miner/miner.rs | 12 +-- miner/src/pool/listener.rs | 100 +++++++++---------------- miner/src/pool/queue.rs | 16 ++-- parity/rpc_apis.rs | 26 ++----- parity/run.rs | 23 +++--- rpc/src/v1/impls/eth_pubsub.rs | 43 +++++++---- rpc/src/v1/tests/mocked/eth_pubsub.rs | 20 +++-- 8 files changed, 138 insertions(+), 139 deletions(-) diff --git a/ethcore/light/src/transaction_queue.rs b/ethcore/light/src/transaction_queue.rs index 2040eabe2..52defd654 100644 --- a/ethcore/light/src/transaction_queue.rs +++ b/ethcore/light/src/transaction_queue.rs @@ -129,15 +129,13 @@ pub enum ImportDestination { Future, } -type Listener = Box; - /// Light transaction queue. See module docs for more details. #[derive(Default)] pub struct TransactionQueue { by_account: HashMap, by_hash: H256FastMap, - listeners: Vec, - tx_statuses_listeners: Vec>>>, + pending_listeners: Vec>>>, + full_listeners: Vec>>>, } impl fmt::Debug for TransactionQueue { @@ -145,7 +143,8 @@ impl fmt::Debug for TransactionQueue { fmt.debug_struct("TransactionQueue") .field("by_account", &self.by_account) .field("by_hash", &self.by_hash) - .field("listeners", &self.listeners.len()) + .field("pending_listeners", &self.pending_listeners.len()) + .field("full_listeners", &self.pending_listeners.len()) .finish() } } @@ -360,30 +359,40 @@ impl TransactionQueue { } /// Add a transaction queue listener. - pub fn add_listener(&mut self, f: Listener) { - self.listeners.push(f); + pub fn pending_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver>> { + let (sender, receiver) = mpsc::unbounded(); + self.pending_listeners.push(sender); + receiver } /// Add a transaction queue listener. - pub fn tx_statuses_receiver(&mut self) -> mpsc::UnboundedReceiver>> { + pub fn full_transactions_receiver(&mut self) -> mpsc::UnboundedReceiver>> { let (sender, receiver) = mpsc::unbounded(); - self.tx_statuses_listeners.push(sender); + self.full_listeners.push(sender); receiver } /// Notifies all listeners about new pending transaction. fn notify(&mut self, hashes: &[H256], status: TxStatus) { - for listener in &self.listeners { - listener(hashes) + if status == TxStatus::Added { + let to_pending_send: Arc> = Arc::new( + hashes + .into_iter() + .map(|hash| hash.clone()) + .collect() + ); + self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok()); + } - let to_send: Arc> = Arc::new( + let to_full_send: Arc> = Arc::new( hashes .into_iter() - .map(|hash| (hash.clone(), status)).collect() + .map(|hash| (hash.clone(), status)) + .collect() ); - self.tx_statuses_listeners.retain(| listener| listener.unbounded_send(to_send.clone()).is_ok()); + self.full_listeners.retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok()); } } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 0c3f94acd..67ef9e618 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -260,14 +260,16 @@ impl Miner { } /// Set a callback to be notified about imported transactions' hashes. - pub fn add_transactions_listener(&self, f: Box) { - self.transaction_queue.add_listener(f); + pub fn pending_transactions_receiver(&self) -> mpsc::UnboundedReceiver>> { + let (sender, receiver) = mpsc::unbounded(); + self.transaction_queue.add_pending_listener(sender); + receiver } - /// Set a callback to be notified - pub fn tx_pool_receiver(&self) -> mpsc::UnboundedReceiver>> { + /// Set a callback to be notified about imported transactions' hashes. + pub fn full_transactions_receiver(&self) -> mpsc::UnboundedReceiver>> { let (sender, receiver) = mpsc::unbounded(); - self.transaction_queue.add_tx_pool_listener(sender); + self.transaction_queue.add_full_listener(sender); receiver } diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 464f73be1..998305904 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -26,50 +26,6 @@ use txpool::{self, VerifiedTransaction}; use pool::VerifiedTransaction as Transaction; use pool::TxStatus; -type Listener = Box; - -/// Manages notifications to pending transaction listeners. -#[derive(Default)] -pub struct Notifier { - listeners: Vec, - pending: Vec, -} - -impl fmt::Debug for Notifier { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Notifier") - .field("listeners", &self.listeners.len()) - .field("pending", &self.pending) - .finish() - } -} - -impl Notifier { - /// Add new listener to receive notifications. - pub fn add(&mut self, f: Listener) { - self.listeners.push(f) - } - - /// Notify listeners about all currently pending transactions. - pub fn notify(&mut self) { - if self.pending.is_empty() { - return; - } - - for l in &self.listeners { - (l)(&self.pending); - } - - self.pending.clear(); - } -} - -impl txpool::Listener for Notifier { - fn added(&mut self, tx: &Arc, _old: Option<&Arc>) { - self.pending.push(*tx.hash()); - } -} - /// Transaction pool logger. #[derive(Default, Debug)] pub struct Logger; @@ -121,14 +77,20 @@ impl txpool::Listener for Logger { /// Transactions pool notifier #[derive(Default)] pub struct TransactionsPoolNotifier { - listeners: Vec>>>, + full_listeners: Vec>>>, + pending_listeners: Vec>>>, tx_statuses: Vec<(H256, TxStatus)>, } impl TransactionsPoolNotifier { - /// Add new listener to receive notifications. - pub fn add(&mut self, f: mpsc::UnboundedSender>>) { - self.listeners.push(f); + /// Add new full listener to receive notifications. + pub fn add_full_listener(&mut self, f: mpsc::UnboundedSender>>) { + self.full_listeners.push(f); + } + + /// Add new pending listener to receive notifications. + pub fn add_pending_listener(&mut self, f: mpsc::UnboundedSender>>) { + self.pending_listeners.push(f); } /// Notify listeners about all currently transactions. @@ -137,16 +99,25 @@ impl TransactionsPoolNotifier { return; } - let to_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new())); - self.listeners - .retain(|listener| listener.unbounded_send(to_send.clone()).is_ok()); + let to_pending_send: Arc> = Arc::new( + self.tx_statuses.clone() + .into_iter() + .map(|(hash, _)| hash) + .collect() + ); + self.pending_listeners.retain(|listener| listener.unbounded_send(to_pending_send.clone()).is_ok()); + + let to_full_send = Arc::new(std::mem::replace(&mut self.tx_statuses, Vec::new())); + self.full_listeners + .retain(|listener| listener.unbounded_send(to_full_send.clone()).is_ok()); } } impl fmt::Debug for TransactionsPoolNotifier { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("TransactionsPoolNotifier") - .field("listeners", &self.listeners.len()) + .field("full_listeners", &self.full_listeners.len()) + .field("pending_listeners", &self.pending_listeners.len()) .finish() } } @@ -180,33 +151,36 @@ impl txpool::Listener for TransactionsPoolNotifier { #[cfg(test)] mod tests { use super::*; - use parking_lot::Mutex; use types::transaction; use txpool::Listener; + use futures::{Stream, Future}; use ethereum_types::Address; #[test] fn should_notify_listeners() { // given - let received = Arc::new(Mutex::new(vec![])); - let r = received.clone(); - let listener = Box::new(move |hashes: &[H256]| { - *r.lock() = hashes.iter().map(|x| *x).collect(); - }); + let (full_sender, full_receiver) = mpsc::unbounded(); + let (pending_sender, pending_receiver) = mpsc::unbounded(); - let mut tx_listener = Notifier::default(); - tx_listener.add(listener); + let mut tx_listener = TransactionsPoolNotifier::default(); + tx_listener.add_full_listener(full_sender); + tx_listener.add_pending_listener(pending_sender); // when let tx = new_tx(); tx_listener.added(&tx, None); - assert_eq!(*received.lock(), vec![]); // then tx_listener.notify(); + let (full_res , _full_receiver)= full_receiver.into_future().wait().unwrap(); + let (pending_res , _pending_receiver)= pending_receiver.into_future().wait().unwrap(); assert_eq!( - *received.lock(), - vec!["13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d".parse().unwrap()] + full_res, + Some(Arc::new(vec![(serde_json::from_str::("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap(), TxStatus::Added)])) + ); + assert_eq!( + pending_res, + Some(Arc::new(vec![serde_json::from_str::("\"0x13aff4201ac1dc49daf6a7cf07b558ed956511acbaabf9502bdacc353953766d\"").unwrap()])) ); } diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 2d8046bd9..8d804e968 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -33,7 +33,7 @@ use pool::{ }; use pool::local_transactions::LocalTransactionsList; -type Listener = (LocalTransactionsList, (listener::Notifier, (listener::Logger, listener::TransactionsPoolNotifier))); +type Listener = (LocalTransactionsList, (listener::TransactionsPoolNotifier, listener::Logger)); type Pool = txpool::Pool; /// Max cache time in milliseconds for pending transactions. @@ -305,8 +305,6 @@ impl TransactionQueue { // Notify about imported transactions. (self.pool.write().listener_mut().1).0.notify(); - ((self.pool.write().listener_mut().1).1).1.notify(); - if results.iter().any(|r| r.is_ok()) { self.cached_pending.write().clear(); } @@ -499,7 +497,7 @@ impl TransactionQueue { /// removes them from the pool. /// That method should be used if invalid transactions are detected /// or you want to cancel a transaction. - pub fn remove<'a, T: IntoIterator>( + pub fn remove<'a, T: IntoIterator>( &self, hashes: T, is_invalid: bool, @@ -571,16 +569,16 @@ impl TransactionQueue { self.pool.read().listener().0.all_transactions().iter().map(|(a, b)| (*a, b.clone())).collect() } - /// Add a callback to be notified about all transactions entering the pool. - pub fn add_listener(&self, f: Box) { + /// Add a listener to be notified about all transactions the pool + pub fn add_pending_listener(&self, f: mpsc::UnboundedSender>>) { let mut pool = self.pool.write(); - (pool.listener_mut().1).0.add(f); + (pool.listener_mut().1).0.add_pending_listener(f); } /// Add a listener to be notified about all transactions the pool - pub fn add_tx_pool_listener(&self, f: mpsc::UnboundedSender>>) { + pub fn add_full_listener(&self, f: mpsc::UnboundedSender>>) { let mut pool = self.pool.write(); - ((pool.listener_mut().1).1).1.add(f); + (pool.listener_mut().1).0.add_full_listener(f); } /// Check if pending set is cached. diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 4693f18ab..66d376c03 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -329,8 +329,9 @@ impl FullDependencies { } Api::EthPubSub => { if !for_generic_pubsub { + let pool_receiver = self.miner.pending_transactions_receiver(); let mut client = - EthPubSubClient::new(self.client.clone(), self.executor.clone()); + EthPubSubClient::new(self.client.clone(), self.executor.clone(), pool_receiver); let weak_client = Arc::downgrade(&self.client); client.add_sync_notifier(self.sync.sync_notification(), move |state| { @@ -345,14 +346,6 @@ impl FullDependencies { }) }); - let h = client.handler(); - self.miner - .add_transactions_listener(Box::new(move |hashes| { - if let Some(h) = h.upgrade() { - h.notify_new_transactions(hashes); - } - })); - if let Some(h) = client.handler().upgrade() { self.client.add_notify(h); } @@ -361,7 +354,7 @@ impl FullDependencies { } Api::ParityTransactionsPool => { if !for_generic_pubsub { - let receiver = self.miner.tx_pool_receiver(); + let receiver = self.miner.full_transactions_receiver(); let client = TransactionsPoolClient::new(self.executor.clone(), receiver); handler.extend_with(TransactionsPoolClient::to_delegate(client)); } @@ -583,6 +576,8 @@ impl LightDependencies { } } Api::EthPubSub => { + let receiver = self.transaction_queue.write().pending_transactions_receiver(); + let mut client = EthPubSubClient::light( self.client.clone(), self.on_demand.clone(), @@ -590,6 +585,7 @@ impl LightDependencies { self.cache.clone(), self.executor.clone(), self.gas_price_percentile, + receiver ); let weak_client = Arc::downgrade(&self.client); @@ -607,19 +603,11 @@ impl LightDependencies { }); self.client.add_listener(client.handler() as Weak<_>); - let h = client.handler(); - self.transaction_queue - .write() - .add_listener(Box::new(move |transactions| { - if let Some(h) = h.upgrade() { - h.notify_new_transactions(transactions); - } - })); handler.extend_with(EthPubSub::to_delegate(client)); } Api::ParityTransactionsPool => { if !for_generic_pubsub { - let receiver = self.transaction_queue.write().tx_statuses_receiver(); + let receiver = self.transaction_queue.write().full_transactions_receiver(); let client = TransactionsPoolClient::new(self.executor.clone(), receiver); handler.extend_with(TransactionsPoolClient::to_delegate(client)); } diff --git a/parity/run.rs b/parity/run.rs index 5858dcf5e..c21399290 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -30,7 +30,7 @@ use ethcore::verification::queue::VerifierSettings; use ethcore_logger::{Config as LogConfig, RotatingLogger}; use ethcore_service::ClientService; use ethereum_types::Address; -use futures::IntoFuture; +use futures::{IntoFuture, Stream}; use hash_fetch::{self, fetch}; use informant::{Informant, LightNodeInformantData, FullNodeInformantData}; use journaldb::Algorithm; @@ -668,14 +668,19 @@ fn execute_impl(cmd: RunCmd, logger: Arc, on_client_rq: // Propagate transactions as soon as they are imported. let tx = ::parking_lot::Mutex::new(priority_tasks); let is_ready = Arc::new(atomic::AtomicBool::new(true)); - miner.add_transactions_listener(Box::new(move |_hashes| { - // we want to have only one PendingTransactions task in the queue. - if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) { - let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); - // we ignore error cause it means that we are closing - let _ = tx.lock().send(task); - } - })); + let executor = runtime.executor(); + let pool_receiver = miner.pending_transactions_receiver(); + executor.spawn( + pool_receiver.for_each(move |_hashes| { + // we want to have only one PendingTransactions task in the queue. + if is_ready.compare_and_swap(true, false, atomic::Ordering::SeqCst) { + let task = ::sync::PriorityTask::PropagateTransactions(Instant::now(), is_ready.clone()); + // we ignore error cause it means that we are closing + let _ = tx.lock().send(task); + } + Ok(()) + }) + ); // provider not added to a notification center is effectively disabled // TODO [debris] refactor it later on diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 019cbd39f..acb0d5f0e 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -20,7 +20,7 @@ use std::sync::{Arc, Weak}; use std::collections::BTreeMap; use jsonrpc_core::{BoxFuture, Result, Error}; -use jsonrpc_core::futures::{self, Future, IntoFuture, Stream}; +use jsonrpc_core::futures::{self, Future, IntoFuture, Stream, sync::mpsc}; use jsonrpc_pubsub::typed::{Sink, Subscriber}; use jsonrpc_pubsub::SubscriptionId; @@ -80,23 +80,39 @@ impl EthPubSubClient } } -impl EthPubSubClient { +impl EthPubSubClient + where + C: 'static + Send + Sync { + /// Creates new `EthPubSubClient`. - pub fn new(client: Arc, executor: Executor) -> Self { + pub fn new(client: Arc, executor: Executor, pool_receiver: mpsc::UnboundedReceiver>>) -> Self { let heads_subscribers = Arc::new(RwLock::new(Subscribers::default())); let logs_subscribers = Arc::new(RwLock::new(Subscribers::default())); let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default())); let sync_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let handler = Arc::new(ChainNotificationHandler { + client, + executor, + heads_subscribers: heads_subscribers.clone(), + logs_subscribers: logs_subscribers.clone(), + transactions_subscribers: transactions_subscribers.clone(), + sync_subscribers: sync_subscribers.clone(), + }); + let handler2 = Arc::downgrade(&handler); + + handler.executor.spawn(pool_receiver + .for_each(move |hashes| { + if let Some(handler2) = handler2.upgrade() { + handler2.notify_new_transactions(&hashes.to_vec()); + return Ok(()) + } + Err(()) + }) + ); + EthPubSubClient { - handler: Arc::new(ChainNotificationHandler { - client, - executor, - heads_subscribers: heads_subscribers.clone(), - logs_subscribers: logs_subscribers.clone(), - transactions_subscribers: transactions_subscribers.clone(), - sync_subscribers: sync_subscribers.clone(), - }), + handler, sync_subscribers, heads_subscribers, logs_subscribers, @@ -123,6 +139,7 @@ where cache: Arc>, executor: Executor, gas_price_percentile: usize, + pool_receiver: mpsc::UnboundedReceiver>> ) -> Self { let fetch = LightFetch { client, @@ -131,7 +148,7 @@ where cache, gas_price_percentile, }; - EthPubSubClient::new(Arc::new(fetch), executor) + EthPubSubClient::new(Arc::new(fetch), executor, pool_receiver) } } @@ -205,7 +222,7 @@ impl ChainNotificationHandler { } /// Notify all subscribers about new transaction hashes. - pub fn notify_new_transactions(&self, hashes: &[H256]) { + fn notify_new_transactions(&self, hashes: &[H256]) { for subscriber in self.transactions_subscribers.read().values() { for hash in hashes { Self::notify(&self.executor, subscriber, pubsub::Result::TransactionHash(*hash)); diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 071e0eace..1336f4e15 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use jsonrpc_core::MetaIoHandler; -use jsonrpc_core::futures::{self, Stream, Future}; +use jsonrpc_core::futures::{self, Stream, Future, sync::mpsc}; use jsonrpc_pubsub::Session; use std::time::Duration; @@ -40,7 +40,9 @@ fn should_subscribe_to_new_heads() { let h2 = client.block_hash_delta_minus(2); let h1 = client.block_hash_delta_minus(3); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); @@ -112,7 +114,9 @@ fn should_subscribe_to_logs() { } ]); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let handler = pubsub.handler().upgrade().unwrap(); let pubsub = pubsub.to_delegate(); @@ -159,8 +163,9 @@ fn should_subscribe_to_pending_transactions() { let el = Runtime::with_thread_count(1); let client = TestBlockChainClient::new(); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); - let handler = pubsub.handler().upgrade().unwrap(); + let (pool_sender, pool_receiver) = mpsc::unbounded(); + + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default(); @@ -181,7 +186,7 @@ fn should_subscribe_to_pending_transactions() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Send new transactions - handler.notify_new_transactions(&[H256::from_low_u64_be(5), H256::from_low_u64_be(7)]); + pool_sender.unbounded_send(Arc::new(vec![H256::from_low_u64_be(5), H256::from_low_u64_be(7)])).unwrap(); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x43ca64edf03768e1"}}"#; @@ -205,7 +210,8 @@ fn eth_subscribe_syncing() { // given let el = Runtime::with_thread_count(1); let client = TestBlockChainClient::new(); - let pubsub = EthPubSubClient::new(Arc::new(client), el.executor()); + let (_, pool_receiver) = mpsc::unbounded(); + let pubsub = EthPubSubClient::new(Arc::new(client), el.executor(), pool_receiver); let pubsub = pubsub.to_delegate(); let mut io = MetaIoHandler::default();