cic-internal-integration/apps/cic-eth/tests/tasks/test_nonce_tasks.py

50 lines
1.3 KiB
Python
Raw Normal View History

2021-02-01 18:12:51 +01:00
# third-party imports
import celery
# local imports
from cic_eth.admin.nonce import shift_nonce
from cic_eth.queue.tx import create as queue_create
from cic_eth.eth.tx import otx_cache_parse_tx
from cic_eth.eth.task import sign_tx
def test_shift_nonce(
default_chain_spec,
init_database,
init_w3,
celery_session_worker,
):
chain_str = str(default_chain_spec)
tx_hashes = []
for i in range(5):
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[i],
'nonce': i,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': default_chain_spec.chain_id(),
'data': '',
}
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
queue_create(tx['nonce'], init_w3.eth.accounts[0], tx_hash_hex, tx_signed_raw_hex, chain_str)
otx_cache_parse_tx(tx_hash_hex, tx_signed_raw_hex, chain_str)
tx_hashes.append(tx_hash_hex)
s = celery.signature(
'cic_eth.admin.nonce.shift_nonce',
[
chain_str,
tx_hashes[2],
],
queue=None,
)
t = s.apply_async()
r = t.get()
for _ in t.collect():
pass
assert t.successful()