Pending transactions subscription (#7906)
* Pending transactions subscription. * Pending transactions Pub-Sub * Add light client support. * Remove redundant Ok
This commit is contained in:
parent
899dd0ff4b
commit
236a4aac22
@ -23,6 +23,7 @@
|
|||||||
//! accounts for which they create transactions, this queue is structured in an
|
//! accounts for which they create transactions, this queue is structured in an
|
||||||
//! address-wise manner.
|
//! address-wise manner.
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::collections::hash_map::Entry;
|
use std::collections::hash_map::Entry;
|
||||||
|
|
||||||
@ -99,25 +100,44 @@ impl AccountTransactions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// attempt to move transactions from the future queue into the current queue.
|
// attempt to move transactions from the future queue into the current queue.
|
||||||
fn adjust_future(&mut self) {
|
fn adjust_future(&mut self) -> Vec<H256> {
|
||||||
|
let mut promoted = Vec::new();
|
||||||
let mut next_nonce = self.next_nonce();
|
let mut next_nonce = self.next_nonce();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.future.remove(&next_nonce) {
|
match self.future.remove(&next_nonce) {
|
||||||
Some(tx) => self.current.push(tx),
|
Some(tx) => {
|
||||||
|
promoted.push(tx.hash);
|
||||||
|
self.current.push(tx)
|
||||||
|
},
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
|
|
||||||
next_nonce = next_nonce + 1.into();
|
next_nonce = next_nonce + 1.into();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
promoted
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Listener = Box<Fn(&[H256]) + Send + Sync>;
|
||||||
|
|
||||||
/// Light transaction queue. See module docs for more details.
|
/// Light transaction queue. See module docs for more details.
|
||||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
#[derive(Default)]
|
||||||
pub struct TransactionQueue {
|
pub struct TransactionQueue {
|
||||||
by_account: HashMap<Address, AccountTransactions>,
|
by_account: HashMap<Address, AccountTransactions>,
|
||||||
by_hash: H256FastMap<PendingTransaction>,
|
by_hash: H256FastMap<PendingTransaction>,
|
||||||
|
listeners: Vec<Listener>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for TransactionQueue {
|
||||||
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
fmt.debug_struct("TransactionQueue")
|
||||||
|
.field("by_account", &self.by_account)
|
||||||
|
.field("by_hash", &self.by_hash)
|
||||||
|
.field("listeners", &self.listeners.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionQueue {
|
impl TransactionQueue {
|
||||||
@ -130,7 +150,7 @@ impl TransactionQueue {
|
|||||||
|
|
||||||
if self.by_hash.contains_key(&hash) { return Err(transaction::Error::AlreadyImported) }
|
if self.by_hash.contains_key(&hash) { return Err(transaction::Error::AlreadyImported) }
|
||||||
|
|
||||||
let res = match self.by_account.entry(sender) {
|
let (res, promoted) = match self.by_account.entry(sender) {
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
entry.insert(AccountTransactions {
|
entry.insert(AccountTransactions {
|
||||||
cur_nonce: CurrentNonce::Assumed(nonce),
|
cur_nonce: CurrentNonce::Assumed(nonce),
|
||||||
@ -138,7 +158,7 @@ impl TransactionQueue {
|
|||||||
future: BTreeMap::new(),
|
future: BTreeMap::new(),
|
||||||
});
|
});
|
||||||
|
|
||||||
transaction::ImportResult::Current
|
(transaction::ImportResult::Current, vec![hash])
|
||||||
}
|
}
|
||||||
Entry::Occupied(mut entry) => {
|
Entry::Occupied(mut entry) => {
|
||||||
let acct_txs = entry.get_mut();
|
let acct_txs = entry.get_mut();
|
||||||
@ -160,7 +180,7 @@ impl TransactionQueue {
|
|||||||
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info);
|
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info);
|
||||||
self.by_hash.remove(&old.hash);
|
self.by_hash.remove(&old.hash);
|
||||||
|
|
||||||
transaction::ImportResult::Current
|
(transaction::ImportResult::Current, vec![hash])
|
||||||
}
|
}
|
||||||
Err(idx) => {
|
Err(idx) => {
|
||||||
let cur_len = acct_txs.current.len();
|
let cur_len = acct_txs.current.len();
|
||||||
@ -182,21 +202,22 @@ impl TransactionQueue {
|
|||||||
acct_txs.future.insert(future_nonce, future);
|
acct_txs.future.insert(future_nonce, future);
|
||||||
}
|
}
|
||||||
|
|
||||||
transaction::ImportResult::Current
|
(transaction::ImportResult::Current, vec![hash])
|
||||||
} else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) {
|
} else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) {
|
||||||
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce);
|
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce);
|
||||||
let future_nonce = nonce;
|
let future_nonce = nonce;
|
||||||
acct_txs.future.insert(future_nonce, tx_info);
|
acct_txs.future.insert(future_nonce, tx_info);
|
||||||
|
|
||||||
transaction::ImportResult::Future
|
(transaction::ImportResult::Future, vec![])
|
||||||
} else {
|
} else {
|
||||||
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce);
|
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce);
|
||||||
|
|
||||||
// insert, then check if we've filled any gaps.
|
// insert, then check if we've filled any gaps.
|
||||||
acct_txs.current.insert(idx, tx_info);
|
acct_txs.current.insert(idx, tx_info);
|
||||||
acct_txs.adjust_future();
|
let mut promoted = acct_txs.adjust_future();
|
||||||
|
promoted.insert(0, hash);
|
||||||
|
|
||||||
transaction::ImportResult::Current
|
(transaction::ImportResult::Current, promoted)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -204,6 +225,7 @@ impl TransactionQueue {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.by_hash.insert(hash, tx);
|
self.by_hash.insert(hash, tx);
|
||||||
|
self.notify(&promoted);
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,6 +346,18 @@ impl TransactionQueue {
|
|||||||
pub fn get(&self, hash: &H256) -> Option<&PendingTransaction> {
|
pub fn get(&self, hash: &H256) -> Option<&PendingTransaction> {
|
||||||
self.by_hash.get(&hash)
|
self.by_hash.get(&hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a transaction queue listener.
|
||||||
|
pub fn add_listener(&mut self, f: Listener) {
|
||||||
|
self.listeners.push(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notifies all listeners about new pending transaction.
|
||||||
|
fn notify(&self, hashes: &[H256]) {
|
||||||
|
for listener in &self.listeners {
|
||||||
|
listener(hashes)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -248,6 +248,7 @@ struct SealingWork {
|
|||||||
pub struct Miner {
|
pub struct Miner {
|
||||||
// NOTE [ToDr] When locking always lock in this order!
|
// NOTE [ToDr] When locking always lock in this order!
|
||||||
transaction_queue: Arc<RwLock<BanningTransactionQueue>>,
|
transaction_queue: Arc<RwLock<BanningTransactionQueue>>,
|
||||||
|
transaction_listener: RwLock<Vec<Box<Fn(&[H256]) + Send + Sync>>>,
|
||||||
sealing_work: Mutex<SealingWork>,
|
sealing_work: Mutex<SealingWork>,
|
||||||
next_allowed_reseal: Mutex<Instant>,
|
next_allowed_reseal: Mutex<Instant>,
|
||||||
next_mandatory_reseal: RwLock<Instant>,
|
next_mandatory_reseal: RwLock<Instant>,
|
||||||
@ -314,6 +315,7 @@ impl Miner {
|
|||||||
|
|
||||||
Miner {
|
Miner {
|
||||||
transaction_queue: Arc::new(RwLock::new(txq)),
|
transaction_queue: Arc::new(RwLock::new(txq)),
|
||||||
|
transaction_listener: RwLock::new(vec![]),
|
||||||
next_allowed_reseal: Mutex::new(Instant::now()),
|
next_allowed_reseal: Mutex::new(Instant::now()),
|
||||||
next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period),
|
next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period),
|
||||||
sealing_block_last_request: Mutex::new(0),
|
sealing_block_last_request: Mutex::new(0),
|
||||||
@ -369,6 +371,11 @@ impl Miner {
|
|||||||
self.map_pending_block(|b| b.header().clone(), latest_block_number)
|
self.map_pending_block(|b| b.header().clone(), latest_block_number)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set a callback to be notified about imported transactions' hashes.
|
||||||
|
pub fn add_transactions_listener(&self, f: Box<Fn(&[H256]) + Send + Sync>) {
|
||||||
|
self.transaction_listener.write().push(f);
|
||||||
|
}
|
||||||
|
|
||||||
fn map_pending_block<F, T>(&self, f: F, latest_block_number: BlockNumber) -> Option<T> where
|
fn map_pending_block<F, T>(&self, f: F, latest_block_number: BlockNumber) -> Option<T> where
|
||||||
F: FnOnce(&ClosedBlock) -> T,
|
F: FnOnce(&ClosedBlock) -> T,
|
||||||
{
|
{
|
||||||
@ -694,8 +701,9 @@ impl Miner {
|
|||||||
) -> Vec<Result<TransactionImportResult, Error>> {
|
) -> Vec<Result<TransactionImportResult, Error>> {
|
||||||
let best_block_header = client.best_block_header().decode();
|
let best_block_header = client.best_block_header().decode();
|
||||||
let insertion_time = client.chain_info().best_block_number;
|
let insertion_time = client.chain_info().best_block_number;
|
||||||
|
let mut inserted = Vec::with_capacity(transactions.len());
|
||||||
|
|
||||||
transactions.into_iter()
|
let results = transactions.into_iter()
|
||||||
.map(|tx| {
|
.map(|tx| {
|
||||||
let hash = tx.hash();
|
let hash = tx.hash();
|
||||||
if client.transaction_block(TransactionId::Hash(hash)).is_some() {
|
if client.transaction_block(TransactionId::Hash(hash)).is_some() {
|
||||||
@ -721,18 +729,28 @@ impl Miner {
|
|||||||
}).unwrap_or(default_origin);
|
}).unwrap_or(default_origin);
|
||||||
|
|
||||||
let details_provider = TransactionDetailsProvider::new(client, &self.service_transaction_action);
|
let details_provider = TransactionDetailsProvider::new(client, &self.service_transaction_action);
|
||||||
match origin {
|
let hash = transaction.hash();
|
||||||
|
let result = match origin {
|
||||||
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
|
TransactionOrigin::Local | TransactionOrigin::RetractedBlock => {
|
||||||
Ok(transaction_queue.add(transaction, origin, insertion_time, condition.clone(), &details_provider)?)
|
transaction_queue.add(transaction, origin, insertion_time, condition.clone(), &details_provider)?
|
||||||
},
|
},
|
||||||
TransactionOrigin::External => {
|
TransactionOrigin::External => {
|
||||||
Ok(transaction_queue.add_with_banlist(transaction, insertion_time, &details_provider)?)
|
transaction_queue.add_with_banlist(transaction, insertion_time, &details_provider)?
|
||||||
},
|
},
|
||||||
}
|
};
|
||||||
|
|
||||||
|
inserted.push(hash);
|
||||||
|
Ok(result)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect()
|
.collect();
|
||||||
|
|
||||||
|
for listener in &*self.transaction_listener.read() {
|
||||||
|
listener(&inserted);
|
||||||
|
}
|
||||||
|
|
||||||
|
results
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Are we allowed to do a non-mandatory reseal?
|
/// Are we allowed to do a non-mandatory reseal?
|
||||||
|
@ -295,7 +295,14 @@ impl FullDependencies {
|
|||||||
Api::EthPubSub => {
|
Api::EthPubSub => {
|
||||||
if !for_generic_pubsub {
|
if !for_generic_pubsub {
|
||||||
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
|
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
|
||||||
self.client.add_notify(client.handler());
|
let h = client.handler();
|
||||||
|
self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() {
|
||||||
|
h.new_transactions(hashes);
|
||||||
|
}));
|
||||||
|
|
||||||
|
if let Some(h) = client.handler().upgrade() {
|
||||||
|
self.client.add_notify(h);
|
||||||
|
}
|
||||||
handler.extend_with(client.to_delegate());
|
handler.extend_with(client.to_delegate());
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -501,9 +508,13 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
|
|||||||
self.remote.clone(),
|
self.remote.clone(),
|
||||||
self.gas_price_percentile,
|
self.gas_price_percentile,
|
||||||
);
|
);
|
||||||
self.client.add_listener(
|
self.client.add_listener(client.handler() as Weak<_>);
|
||||||
Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify>
|
let h = client.handler();
|
||||||
);
|
self.transaction_queue.write().add_listener(Box::new(move |transactions| {
|
||||||
|
if let Some(h) = h.upgrade() {
|
||||||
|
h.new_transactions(transactions);
|
||||||
|
}
|
||||||
|
}));
|
||||||
handler.extend_with(EthPubSub::to_delegate(client));
|
handler.extend_with(EthPubSub::to_delegate(client));
|
||||||
},
|
},
|
||||||
Api::Personal => {
|
Api::Personal => {
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
//! Eth PUB-SUB rpc implementation.
|
//! Eth PUB-SUB rpc implementation.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Weak};
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use jsonrpc_core::{BoxFuture, Result, Error};
|
use jsonrpc_core::{BoxFuture, Result, Error};
|
||||||
@ -50,6 +50,7 @@ pub struct EthPubSubClient<C> {
|
|||||||
handler: Arc<ChainNotificationHandler<C>>,
|
handler: Arc<ChainNotificationHandler<C>>,
|
||||||
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
||||||
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
|
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
|
||||||
|
transactions_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> EthPubSubClient<C> {
|
impl<C> EthPubSubClient<C> {
|
||||||
@ -57,15 +58,19 @@ impl<C> EthPubSubClient<C> {
|
|||||||
pub fn new(client: Arc<C>, remote: Remote) -> Self {
|
pub fn new(client: Arc<C>, remote: Remote) -> Self {
|
||||||
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
|
let transactions_subscribers = Arc::new(RwLock::new(Subscribers::default()));
|
||||||
|
|
||||||
EthPubSubClient {
|
EthPubSubClient {
|
||||||
handler: Arc::new(ChainNotificationHandler {
|
handler: Arc::new(ChainNotificationHandler {
|
||||||
client,
|
client,
|
||||||
remote,
|
remote,
|
||||||
heads_subscribers: heads_subscribers.clone(),
|
heads_subscribers: heads_subscribers.clone(),
|
||||||
logs_subscribers: logs_subscribers.clone(),
|
logs_subscribers: logs_subscribers.clone(),
|
||||||
|
transactions_subscribers: transactions_subscribers.clone(),
|
||||||
}),
|
}),
|
||||||
heads_subscribers,
|
heads_subscribers,
|
||||||
logs_subscribers,
|
logs_subscribers,
|
||||||
|
transactions_subscribers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,12 +80,13 @@ impl<C> EthPubSubClient<C> {
|
|||||||
let client = Self::new(client, remote);
|
let client = Self::new(client, remote);
|
||||||
*client.heads_subscribers.write() = Subscribers::new_test();
|
*client.heads_subscribers.write() = Subscribers::new_test();
|
||||||
*client.logs_subscribers.write() = Subscribers::new_test();
|
*client.logs_subscribers.write() = Subscribers::new_test();
|
||||||
|
*client.transactions_subscribers.write() = Subscribers::new_test();
|
||||||
client
|
client
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a chain notification handler.
|
/// Returns a chain notification handler.
|
||||||
pub fn handler(&self) -> Arc<ChainNotificationHandler<C>> {
|
pub fn handler(&self) -> Weak<ChainNotificationHandler<C>> {
|
||||||
self.handler.clone()
|
Arc::downgrade(&self.handler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,6 +117,7 @@ pub struct ChainNotificationHandler<C> {
|
|||||||
remote: Remote,
|
remote: Remote,
|
||||||
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
||||||
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
|
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
|
||||||
|
transactions_subscribers: Arc<RwLock<Subscribers<Client>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> ChainNotificationHandler<C> {
|
impl<C> ChainNotificationHandler<C> {
|
||||||
@ -164,6 +171,15 @@ impl<C> ChainNotificationHandler<C> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notify all subscribers about new transaction hashes.
|
||||||
|
pub fn new_transactions(&self, hashes: &[H256]) {
|
||||||
|
for subscriber in self.transactions_subscribers.read().values() {
|
||||||
|
for hash in hashes {
|
||||||
|
Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A light client wrapper struct.
|
/// A light client wrapper struct.
|
||||||
@ -256,16 +272,23 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
|
|||||||
self.heads_subscribers.write().push(subscriber);
|
self.heads_subscribers.write().push(subscriber);
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
|
(pubsub::Kind::NewHeads, _) => {
|
||||||
|
errors::invalid_params("newHeads", "Expected no parameters.")
|
||||||
|
},
|
||||||
(pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
|
(pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
|
||||||
self.logs_subscribers.write().push(subscriber, filter.into());
|
self.logs_subscribers.write().push(subscriber, filter.into());
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
(pubsub::Kind::NewHeads, _) => {
|
|
||||||
errors::invalid_params("newHeads", "Expected no parameters.")
|
|
||||||
},
|
|
||||||
(pubsub::Kind::Logs, _) => {
|
(pubsub::Kind::Logs, _) => {
|
||||||
errors::invalid_params("logs", "Expected a filter object.")
|
errors::invalid_params("logs", "Expected a filter object.")
|
||||||
},
|
},
|
||||||
|
(pubsub::Kind::NewPendingTransactions, None) => {
|
||||||
|
self.transactions_subscribers.write().push(subscriber);
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
(pubsub::Kind::NewPendingTransactions, _) => {
|
||||||
|
errors::invalid_params("newPendingTransactions", "Expected no parameters.")
|
||||||
|
},
|
||||||
_ => {
|
_ => {
|
||||||
errors::unimplemented(None)
|
errors::unimplemented(None)
|
||||||
},
|
},
|
||||||
@ -277,7 +300,8 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
|
|||||||
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
|
fn unsubscribe(&self, id: SubscriptionId) -> Result<bool> {
|
||||||
let res = self.heads_subscribers.write().remove(&id).is_some();
|
let res = self.heads_subscribers.write().remove(&id).is_some();
|
||||||
let res2 = self.logs_subscribers.write().remove(&id).is_some();
|
let res2 = self.logs_subscribers.write().remove(&id).is_some();
|
||||||
|
let res3 = self.transactions_subscribers.write().remove(&id).is_some();
|
||||||
|
|
||||||
Ok(res || res2)
|
Ok(res || res2 || res3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ fn should_subscribe_to_new_heads() {
|
|||||||
let h1 = client.block_hash_delta_minus(3);
|
let h1 = client.block_hash_delta_minus(3);
|
||||||
|
|
||||||
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
||||||
let handler = pubsub.handler();
|
let handler = pubsub.handler().upgrade().unwrap();
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
@ -109,7 +109,7 @@ fn should_subscribe_to_logs() {
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
||||||
let handler = pubsub.handler();
|
let handler = pubsub.handler().upgrade().unwrap();
|
||||||
let pubsub = pubsub.to_delegate();
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
let mut io = MetaIoHandler::default();
|
let mut io = MetaIoHandler::default();
|
||||||
@ -150,6 +150,54 @@ fn should_subscribe_to_logs() {
|
|||||||
assert_eq!(res, None);
|
assert_eq!(res, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn should_subscribe_to_pending_transactions() {
|
||||||
|
// given
|
||||||
|
let el = EventLoop::spawn();
|
||||||
|
let client = TestBlockChainClient::new();
|
||||||
|
|
||||||
|
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
|
||||||
|
let handler = pubsub.handler().upgrade().unwrap();
|
||||||
|
let pubsub = pubsub.to_delegate();
|
||||||
|
|
||||||
|
let mut io = MetaIoHandler::default();
|
||||||
|
io.extend_with(pubsub);
|
||||||
|
|
||||||
|
let mut metadata = Metadata::default();
|
||||||
|
let (sender, receiver) = futures::sync::mpsc::channel(8);
|
||||||
|
metadata.session = Some(Arc::new(Session::new(sender)));
|
||||||
|
|
||||||
|
// Fail if params are provided
|
||||||
|
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions", {}], "id": 1}"#;
|
||||||
|
let response = r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Couldn't parse parameters: newPendingTransactions","data":"\"Expected no parameters.\""},"id":1}"#;
|
||||||
|
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||||
|
|
||||||
|
// Subscribe
|
||||||
|
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#;
|
||||||
|
let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#;
|
||||||
|
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||||
|
|
||||||
|
// Send new transactions
|
||||||
|
handler.new_transactions(&[5.into(), 7.into()]);
|
||||||
|
|
||||||
|
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||||
|
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#;
|
||||||
|
assert_eq!(res, Some(response.into()));
|
||||||
|
|
||||||
|
let (res, receiver) = receiver.into_future().wait().unwrap();
|
||||||
|
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000007","subscription":"0x416d77337e24399d"}}"#;
|
||||||
|
assert_eq!(res, Some(response.into()));
|
||||||
|
|
||||||
|
// And unsubscribe
|
||||||
|
let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#;
|
||||||
|
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
|
||||||
|
assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));
|
||||||
|
|
||||||
|
let (res, _receiver) = receiver.into_future().wait().unwrap();
|
||||||
|
assert_eq!(res, None);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn should_return_unimplemented() {
|
fn should_return_unimplemented() {
|
||||||
// given
|
// given
|
||||||
@ -167,8 +215,6 @@ fn should_return_unimplemented() {
|
|||||||
|
|
||||||
// Subscribe
|
// Subscribe
|
||||||
let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#;
|
let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#;
|
||||||
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#;
|
|
||||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
|
||||||
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#;
|
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#;
|
||||||
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
use serde::de::Error;
|
use serde::de::Error;
|
||||||
use serde_json::{Value, from_value};
|
use serde_json::{Value, from_value};
|
||||||
use v1::types::{RichHeader, Filter, Log};
|
use v1::types::{RichHeader, Filter, Log, H256};
|
||||||
|
|
||||||
/// Subscription result.
|
/// Subscription result.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
@ -28,6 +28,8 @@ pub enum Result {
|
|||||||
Header(RichHeader),
|
Header(RichHeader),
|
||||||
/// Log
|
/// Log
|
||||||
Log(Log),
|
Log(Log),
|
||||||
|
/// Transaction hash
|
||||||
|
TransactionHash(H256),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serialize for Result {
|
impl Serialize for Result {
|
||||||
@ -37,6 +39,7 @@ impl Serialize for Result {
|
|||||||
match *self {
|
match *self {
|
||||||
Result::Header(ref header) => header.serialize(serializer),
|
Result::Header(ref header) => header.serialize(serializer),
|
||||||
Result::Log(ref log) => log.serialize(serializer),
|
Result::Log(ref log) => log.serialize(serializer),
|
||||||
|
Result::TransactionHash(ref hash) => hash.serialize(serializer),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user