Add test for nonce txs, paused txs queries

This commit is contained in:
nolash 2021-04-02 14:59:13 +02:00
parent 39afbb71c1
commit fa9a992d9c
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 166 additions and 3 deletions

View File

@ -180,7 +180,7 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco
return txs
def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None):
def get_status_tx_cache(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None, decoder=None):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
@ -196,7 +196,6 @@ def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False,
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.join(TxCache)
# before = datetime.datetime.utcnow()
if before != None:
q = q.filter(TxCache.date_updated<before)
if exact:
@ -216,7 +215,7 @@ def get_status_tx(chain_spec, status, not_status=None, before=None, exact=False,
return txs
def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None):
def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, session=None, decoder=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set.

164
tests/test_query.py Normal file
View File

@ -0,0 +1,164 @@
# standard imports
import os
import logging
import unittest
# local imports
from chainqueue.query import *
from chainqueue.tx import create
from chainqueue.state import set_waitforgas
from hexathon import (
add_0x,
strip_0x,
)
# test imports
from tests.base import TestTxBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
class TestTxQuery(TestTxBase):
def test_get_tx(self):
tx = get_tx(self.chain_spec, self.tx_hash)
expected_keys = [
'otx_id',
'status',
'signed_tx',
'nonce',
]
for k in tx.keys():
expected_keys.remove(k)
self.assertEqual(len(expected_keys), 0)
def test_nonce_tx(self):
nonce_hashes = [self.tx_hash]
tx_hash = add_0x(os.urandom(32).hex())
signed_tx = add_0x(os.urandom(128).hex())
create(
42,
self.alice,
tx_hash,
signed_tx,
self.chain_spec,
session=self.session,
)
txc = TxCache(
tx_hash,
self.alice,
self.bob,
self.foo_token,
self.bar_token,
self.from_value,
self.to_value,
session=self.session,
)
self.session.add(txc)
self.session.commit()
nonce_hashes.append(tx_hash)
tx_hash = add_0x(os.urandom(32).hex())
signed_tx = add_0x(os.urandom(128).hex())
create(
41,
self.alice,
tx_hash,
signed_tx,
self.chain_spec,
session=self.session,
)
txc = TxCache(
tx_hash,
self.alice,
self.bob,
self.foo_token,
self.bar_token,
self.from_value,
self.to_value,
session=self.session,
)
self.session.add(txc)
txs = get_nonce_tx_cache(self.chain_spec, 42, self.alice)
self.assertEqual(len(txs.keys()), 2)
for h in nonce_hashes:
self.assertTrue(strip_0x(h) in txs)
def test_paused_tx_cache(self):
set_waitforgas(self.tx_hash)
tx_hash = add_0x(os.urandom(32).hex())
signed_tx = add_0x(os.urandom(128).hex())
create(
43,
self.alice,
tx_hash,
signed_tx,
self.chain_spec,
session=self.session,
)
txc = TxCache(
tx_hash,
self.alice,
self.bob,
self.foo_token,
self.bar_token,
self.from_value,
self.to_value,
session=self.session,
)
self.session.add(txc)
self.session.commit()
txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.alice, session=self.session)
self.assertEqual(len(txs.keys()), 1)
txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session)
self.assertEqual(len(txs.keys()), 1)
tx_hash = add_0x(os.urandom(32).hex())
signed_tx = add_0x(os.urandom(128).hex())
create(
42,
self.bob,
tx_hash,
signed_tx,
self.chain_spec,
session=self.session,
)
txc = TxCache(
tx_hash,
self.bob,
self.alice,
self.bar_token,
self.foo_token,
self.to_value,
self.from_value,
session=self.session,
)
self.session.add(txc)
self.session.commit()
txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session)
self.assertEqual(len(txs.keys()), 1)
set_waitforgas(tx_hash)
self.session.commit()
txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, session=self.session)
self.assertEqual(len(txs.keys()), 2)
txs = get_paused_tx_cache(self.chain_spec, status=StatusBits.GAS_ISSUES, sender=self.bob, session=self.session)
self.assertEqual(len(txs.keys()), 1)
if __name__ == '__main__':
unittest.main()