50 lines
1.3 KiB
Python
50 lines
1.3 KiB
Python
# third-party imports
|
|
import celery
|
|
|
|
# local imports
|
|
from cic_eth.admin.nonce import shift_nonce
|
|
from cic_eth.queue.tx import create as queue_create
|
|
from cic_eth.eth.tx import otx_cache_parse_tx
|
|
from cic_eth.eth.task import sign_tx
|
|
|
|
def test_shift_nonce(
|
|
default_chain_spec,
|
|
init_database,
|
|
init_w3,
|
|
celery_session_worker,
|
|
):
|
|
|
|
chain_str = str(default_chain_spec)
|
|
|
|
tx_hashes = []
|
|
for i in range(5):
|
|
tx = {
|
|
'from': init_w3.eth.accounts[0],
|
|
'to': init_w3.eth.accounts[i],
|
|
'nonce': i,
|
|
'gas': 21000,
|
|
'gasPrice': 1000000,
|
|
'value': 128,
|
|
'chainId': default_chain_spec.chain_id(),
|
|
'data': '',
|
|
}
|
|
|
|
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
|
|
queue_create(tx['nonce'], init_w3.eth.accounts[0], tx_hash_hex, tx_signed_raw_hex, chain_str)
|
|
otx_cache_parse_tx(tx_hash_hex, tx_signed_raw_hex, chain_str)
|
|
tx_hashes.append(tx_hash_hex)
|
|
|
|
s = celery.signature(
|
|
'cic_eth.admin.nonce.shift_nonce',
|
|
[
|
|
chain_str,
|
|
tx_hashes[2],
|
|
],
|
|
queue=None,
|
|
)
|
|
t = s.apply_async()
|
|
r = t.get()
|
|
for _ in t.collect():
|
|
pass
|
|
assert t.successful()
|