Rehabilitate queue tests

This commit is contained in:
nolash 2021-03-20 21:10:19 +01:00
parent 788aa9d7b9
commit 964952e904
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 485 additions and 421 deletions

View File

@ -19,7 +19,7 @@ celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_hash=None): def lock(chained_input, chain_spec_dict, address=zero_address, flags=LockEnum.ALL, tx_hash=None):
"""Task wrapper to set arbitrary locks """Task wrapper to set arbitrary locks
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -31,13 +31,14 @@ def lock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL, tx_
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash) r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r)) logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL): def unlock(chained_input, chain_spec_dict, address=zero_address, flags=LockEnum.ALL):
"""Task wrapper to reset arbitrary locks """Task wrapper to reset arbitrary locks
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -49,13 +50,14 @@ def unlock(chained_input, chain_str, address=zero_address, flags=LockEnum.ALL):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, flags, address=address) r = Lock.reset(chain_str, flags, address=address)
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r)) logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None): def lock_send(chained_input, chain_spec_dict, address=zero_address, tx_hash=None):
"""Task wrapper to set send lock """Task wrapper to set send lock
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -65,13 +67,14 @@ def lock_send(chained_input, chain_str, address=zero_address, tx_hash=None):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash) r = Lock.set(chain_str, LockEnum.SEND, address=address, tx_hash=tx_hash)
logg.debug('Send locked for {}, flag now {}'.format(address, r)) logg.debug('Send locked for {}, flag now {}'.format(address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_send(chained_input, chain_str, address=zero_address): def unlock_send(chained_input, chain_spec_dict, address=zero_address):
"""Task wrapper to reset send lock """Task wrapper to reset send lock
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -81,13 +84,14 @@ def unlock_send(chained_input, chain_str, address=zero_address):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.SEND, address=address) r = Lock.reset(chain_str, LockEnum.SEND, address=address)
logg.debug('Send unlocked for {}, flag now {}'.format(address, r)) logg.debug('Send unlocked for {}, flag now {}'.format(address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None): def lock_queue(chained_input, chain_spec_dict, address=zero_address, tx_hash=None):
"""Task wrapper to set queue direct lock """Task wrapper to set queue direct lock
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -97,13 +101,14 @@ def lock_queue(chained_input, chain_str, address=zero_address, tx_hash=None):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash) r = Lock.set(chain_str, LockEnum.QUEUE, address=address, tx_hash=tx_hash)
logg.debug('Queue direct locked for {}, flag now {}'.format(address, r)) logg.debug('Queue direct locked for {}, flag now {}'.format(address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def unlock_queue(chained_input, chain_str, address=zero_address): def unlock_queue(chained_input, chain_spec_dict, address=zero_address):
"""Task wrapper to reset queue direct lock """Task wrapper to reset queue direct lock
:param chain_str: Chain spec string representation :param chain_str: Chain spec string representation
@ -113,13 +118,15 @@ def unlock_queue(chained_input, chain_str, address=zero_address):
:returns: New lock state for address :returns: New lock state for address
:rtype: number :rtype: number
""" """
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
r = Lock.reset(chain_str, LockEnum.QUEUE, address=address) r = Lock.reset(chain_str, LockEnum.QUEUE, address=address)
logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r)) logg.debug('Queue direct unlocked for {}, flag now {}'.format(address, r))
return chained_input return chained_input
@celery_app.task(base=CriticalSQLAlchemyTask) @celery_app.task(base=CriticalSQLAlchemyTask)
def check_lock(chained_input, chain_str, lock_flags, address=None): def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
session = SessionBase.create_session() session = SessionBase.create_session()
r = Lock.check(chain_str, lock_flags, address=zero_address, session=session) r = Lock.check(chain_str, lock_flags, address=zero_address, session=session)
if address != None: if address != None:

View File

@ -68,13 +68,12 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_spec, obsolete_prede
session.flush() session.flush()
if obsolete_predecessors: if obsolete_predecessors:
# TODO: obsolete previous txs from same holder with same nonce
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(Otx.nonce==nonce) q = q.filter(Otx.nonce==nonce)
q = q.filter(TxCache.sender==holder_address) q = q.filter(TxCache.sender==holder_address)
q = q.filter(Otx.tx_hash!=tx_hash) q = q.filter(Otx.tx_hash!=tx_hash)
q = q.filter(Otx.status<=StatusEnum.SENT) q = q.filter(Otx.status.op('&')(StatusBits.FINAL)==0)
for otx in q.all(): for otx in q.all():
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))

View File

@ -3,16 +3,18 @@ import datetime
import os import os
import logging import logging
# third-party imports # external imports
import pytest import pytest
from sqlalchemy import DateTime from sqlalchemy import DateTime
#from cic_registry import CICRegistry
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.nonce import OverrideNonceOracle from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.tx import unpack
from chainlib.eth.gas import ( from chainlib.eth.gas import (
RPCGasOracle, RPCGasOracle,
Gas, Gas,
) )
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import strip_0x
# local imports # local imports
from cic_eth.eth.tx import cache_gas_data from cic_eth.eth.tx import cache_gas_data
@ -39,7 +41,10 @@ from cic_eth.queue.tx import get_upcoming_tx
from cic_eth.queue.tx import get_account_tx from cic_eth.queue.tx import get_account_tx
from cic_eth.queue.tx import get_tx from cic_eth.queue.tx import get_tx
from cic_eth.db.error import TxStateChangeError from cic_eth.db.error import TxStateChangeError
from cic_eth.queue.tx import registry_tx from cic_eth.queue.tx import register_tx
# test imports
from tests.util.nonce import StaticNonceOracle
logg = logging.getLogger() logg = logging.getLogger()
@ -53,7 +58,7 @@ def test_finalize(
): ):
rpc = RPCConnection.connect(default_chain_spec, 'default') rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 0) nonce_oracle = StaticNonceOracle(0)
gas_oracle = RPCGasOracle(eth_rpc) gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
@ -64,7 +69,7 @@ def test_finalize(
c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)), c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)),
] ]
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 1) nonce_oracle = StaticNonceOracle(1)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id()) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6))) txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6)))
@ -80,8 +85,8 @@ def test_finalize(
tx_hashes.append(tx_hash_hex) tx_hashes.append(tx_hash_hex)
if i < 4: if i < 3:
set_sent_status(tx_hash.hex()) set_sent_status(tx_hash_hex)
i += 1 i += 1
@ -124,408 +129,449 @@ def test_finalize(
assert not is_error_status(otx.status) assert not is_error_status(otx.status)
#def test_expired( def test_expired(
# default_chain_spec, default_chain_spec,
# init_database, init_database,
# init_w3, eth_rpc,
# ): eth_signer,
# agent_roles,
# tx_hashes = [] ):
# for i in range(1, 6):
# tx = { rpc = RPCConnection.connect(default_chain_spec, 'default')
# 'from': init_w3.eth.accounts[0], nonce_oracle = StaticNonceOracle(42)
# 'to': init_w3.eth.accounts[1], gas_oracle = RPCGasOracle(eth_rpc)
# 'nonce': 42 + int(i/2), c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# 'gas': 21000,
# 'gasPrice': 1000000*i, txs_rpc = [
# 'value': 128, c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)),
# 'chainId': 666, c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)),
# 'data': '0x', ]
# }
# tx_signed = init_w3.eth.sign_transaction(tx) nonce_oracle = StaticNonceOracle(43)
# #tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw']) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) txs_rpc += [
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)),
# cache_gas_refill_data(tx_hash.hex(), tx) c.create(agent_roles['ALICE'], agent_roles['BOB'], 400 * (10 ** 6)),
# tx_hashes.append(tx_hash.hex()) ]
# set_sent_status(tx_hash.hex(), False)
# otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash.hex()).first() nonce_oracle = StaticNonceOracle(44)
# fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# otx.date_created = fake_created txs_rpc.append(c.create(agent_roles['ALICE'], agent_roles['BOB'], 500 * (10 ** 6)))
# init_database.add(otx)
# init_database.commit() tx_hashes = []
# init_database.refresh(otx)
# i = 0
# now = datetime.datetime.utcnow() for entry in txs_rpc:
# delta = datetime.timedelta(seconds=61) tx_hash_hex = entry[0]
# then = now - delta tx_rpc = entry[1]
# tx_signed_raw_hex = tx_rpc['params'][0]
# otxs = OtxSync.get_expired(then)
# nonce_acc = 0 register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# for otx in otxs: cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
# nonce_acc += otx.nonce
# tx_hashes.append(tx_hash_hex)
# assert nonce_acc == (43 + 44)
# set_sent_status(tx_hash_hex, False)
#
#def test_get_paused( otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
# init_w3, fake_created = datetime.datetime.utcnow() - datetime.timedelta(seconds=40*i)
# init_database, otx.date_created = fake_created
# cic_registry, init_database.add(otx)
# default_chain_spec, init_database.commit()
# ): init_database.refresh(otx)
#
# tx_hashes = [] i += 1
# for i in range(1, 3):
# tx = { now = datetime.datetime.utcnow()
# 'from': init_w3.eth.accounts[0], delta = datetime.timedelta(seconds=61)
# 'to': init_w3.eth.accounts[1], then = now - delta
# 'nonce': 42 + int(i),
# 'gas': 21000, otxs = OtxSync.get_expired(then)
# 'gasPrice': 1000000*i, nonce_acc = 0
# 'value': 128, for otx in otxs:
# 'chainId': 8995, nonce_acc += otx.nonce
# 'data': '0x',
# } assert nonce_acc == (43 + 44)
# logg.debug('nonce {}'.format(tx['nonce']))
# tx_signed = init_w3.eth.sign_transaction(tx)
# #tx_hash = RpcClient.w3.keccak(hexstr=tx_signed['raw']) def test_get_paused(
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) init_database,
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) default_chain_spec,
# cache_gas_refill_data(tx_hash.hex(), tx) eth_rpc,
# tx_hashes.append(tx_hash.hex()) eth_signer,
# agent_roles,
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1]) ):
# txs = get_paused_txs(sender=init_w3.eth.accounts[0])
# assert len(txs.keys()) == 0 chain_id = default_chain_spec.chain_id()
# rpc = RPCConnection.connect(default_chain_spec, 'default')
# q = init_database.query(Otx) nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 42)
# q = q.filter(Otx.tx_hash==tx_hashes[0]) gas_oracle = RPCGasOracle(eth_rpc)
# r = q.first() c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# r.waitforgas(session=init_database)
# init_database.add(r) txs_rpc = [
# init_database.commit() c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6)),
# c.create(agent_roles['ALICE'], agent_roles['BOB'], 200 * (10 ** 6)),
# chain_id = default_chain_spec.chain_id() ]
# txs = get_paused_txs(chain_id=chain_id)
# assert len(txs.keys()) == 1 tx_hashes = []
# for entry in txs_rpc:
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1]) tx_hash_hex = entry[0]
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) tx_rpc = entry[1]
# assert len(txs.keys()) == 1 tx_signed_raw_hex = tx_rpc['params'][0]
#
# txs = get_paused_txs(status=StatusEnum.WAITFORGAS) register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# assert len(txs.keys()) == 1 cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
#
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) tx_hashes.append(tx_hash_hex)
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id)
# assert len(txs.keys()) == 1 txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id)
# assert len(txs.keys()) == 0
#
# q = init_database.query(Otx) q = init_database.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hashes[1]) q = q.filter(Otx.tx_hash==tx_hashes[0])
# o = q.first() r = q.first()
# o.waitforgas(session=init_database) r.waitforgas(session=init_database)
# init_database.add(o) init_database.add(r)
# init_database.commit() init_database.commit()
#
# txs = get_paused_txs() chain_id = default_chain_spec.chain_id()
# assert len(txs.keys()) == 2 txs = get_paused_txs(chain_id=chain_id)
# assert len(txs.keys()) == 1
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1])
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
# assert len(txs.keys()) == 2 assert len(txs.keys()) == 1
#
# txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) txs = get_paused_txs(status=StatusBits.GAS_ISSUES)
# assert len(txs.keys()) == 2 assert len(txs.keys()) == 1
#
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id)
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id) assert len(txs.keys()) == 1
# assert len(txs.keys()) == 2
#
# q = init_database.query(Otx)
# q = init_database.query(Otx) q = q.filter(Otx.tx_hash==tx_hashes[1])
# q = q.filter(Otx.tx_hash==tx_hashes[1]) o = q.first()
# o = q.first() o.waitforgas(session=init_database)
# o.sendfail(session=init_database) init_database.add(o)
# init_database.add(o) init_database.commit()
# init_database.commit()
# txs = get_paused_txs()
# txs = get_paused_txs() assert len(txs.keys()) == 2
# assert len(txs.keys()) == 2
# txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], chain_id=chain_id) assert len(txs.keys()) == 2
# assert len(txs.keys()) == 2
# txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id)
# txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id) assert len(txs.keys()) == 2
# assert len(txs.keys()) == 1
# txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0])
# #txs = get_paused_txs(recipient=init_w3.eth.accounts[1], status=StatusEnum.WAITFORGAS) assert len(txs.keys()) == 2
# txs = get_paused_txs(sender=init_w3.eth.accounts[0], status=StatusEnum.WAITFORGAS, chain_id=chain_id)
# assert len(txs.keys()) == 1 q = init_database.query(Otx)
# q = q.filter(Otx.tx_hash==tx_hashes[1])
# o = q.first()
#def test_get_upcoming( o.sendfail(session=init_database)
# default_chain_spec, init_database.add(o)
# init_w3, init_database.commit()
# init_database,
# cic_registry, txs = get_paused_txs()
# ): assert len(txs.keys()) == 2
#
# tx_hashes = [] txs = get_paused_txs(sender=agent_roles['ALICE'], chain_id=chain_id) # init_w3.eth.accounts[0])
# for i in range(0, 7): assert len(txs.keys()) == 2
# tx = {
# 'from': init_w3.eth.accounts[i % 3], txs = get_paused_txs(status=StatusBits.GAS_ISSUES, chain_id=chain_id)
# 'to': init_w3.eth.accounts[1], txs = get_paused_txs(status=StatusEnum.WAITFORGAS, chain_id=chain_id)
# 'nonce': 42 + int(i / 3), assert len(txs.keys()) == 1
# 'gas': 21000,
# 'gasPrice': 1000000*i, txs = get_paused_txs(sender=agent_roles['ALICE'], status=StatusBits.GAS_ISSUES, chain_id=chain_id) # init_w3.eth.accounts[0])
# 'value': 128, assert len(txs.keys()) == 1
# 'chainId': 8995,
# 'data': '0x',
# } def test_get_upcoming(
# tx_signed = init_w3.eth.sign_transaction(tx) default_chain_spec,
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) eth_rpc,
# logg.debug('{} nonce {} {}'.format(i, tx['nonce'], tx_hash.hex())) eth_signer,
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) init_database,
# cache_gas_refill_data(tx_hash.hex(), tx) agent_roles,
# tx_hashes.append(tx_hash.hex()) ):
#
# chain_id = int(default_chain_spec.chain_id()) chain_id = default_chain_spec.chain_id()
# rpc = RPCConnection.connect(default_chain_spec, 'default')
# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) nonce_oracle = StaticNonceOracle(42)
# assert len(txs.keys()) == 3 gas_oracle = RPCGasOracle(eth_rpc)
# c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[0]][2:]), chain_id)
# assert tx['nonce'] == 42 txs_rpc = [
# c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[1]][2:]), chain_id) c.create(agent_roles['BOB'], agent_roles['DAVE'], 200 * (10 ** 6)),
# assert tx['nonce'] == 42 c.create(agent_roles['CAROL'], agent_roles['DAVE'], 300 * (10 ** 6)),
# ]
# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[2]][2:]), chain_id)
# assert tx['nonce'] == 42 nonce_oracle = StaticNonceOracle(43)
# c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# q = init_database.query(TxCache) txs_rpc += [
# q = q.filter(TxCache.sender==init_w3.eth.accounts[0]) c.create(agent_roles['ALICE'], agent_roles['DAVE'], 400 * (10 ** 6)),
# for o in q.all(): c.create(agent_roles['BOB'], agent_roles['DAVE'], 500 * (10 ** 6)),
# o.date_checked -= datetime.timedelta(seconds=30) c.create(agent_roles['CAROL'], agent_roles['DAVE'], 600 * (10 ** 6)),
# init_database.add(o) ]
# init_database.commit()
# nonce_oracle = StaticNonceOracle(44)
# before = datetime.datetime.now() - datetime.timedelta(seconds=20) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# logg.debug('before {}'.format(before)) txs_rpc += [
# txs = get_upcoming_tx(StatusEnum.PENDING, before=before) c.create(agent_roles['ALICE'], agent_roles['DAVE'], 700 * (10 ** 6)),
# logg.debug('txs {} {}'.format(txs.keys(), txs.values())) ]
# assert len(txs.keys()) == 1
# tx_hashes = []
# # Now date checked has been set to current time, and the check returns no results for entry in txs_rpc:
# txs = get_upcoming_tx(StatusEnum.PENDING, before=before) tx_hash_hex = entry[0]
# logg.debug('txs {} {}'.format(txs.keys(), txs.values())) tx_rpc = entry[1]
# assert len(txs.keys()) == 0 tx_signed_raw_hex = tx_rpc['params'][0]
#
# set_sent_status(tx_hashes[0]) register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
# txs = get_upcoming_tx(StatusEnum.PENDING)
# assert len(txs.keys()) == 3 tx_hashes.append(tx_hash_hex)
# with pytest.raises(KeyError):
# tx = txs[tx_hashes[0]] set_ready(tx_hash_hex)
#
# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[3]][2:]), chain_id) txs = get_upcoming_tx(StatusBits.QUEUED, chain_id=chain_id)
# assert tx['nonce'] == 43 assert len(txs.keys()) == 3
#
# set_waitforgas(tx_hashes[1]) tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[0]])), chain_id)
# txs = get_upcoming_tx(StatusEnum.PENDING) assert tx['nonce'] == 42
# assert len(txs.keys()) == 3
# with pytest.raises(KeyError): tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[1]])), chain_id)
# tx = txs[tx_hashes[1]] assert tx['nonce'] == 42
#
# tx = unpack_signed_raw_tx(bytes.fromhex(txs[tx_hashes[3]][2:]), chain_id) tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[2]])), chain_id)
# assert tx['nonce'] == 43 assert tx['nonce'] == 42
#
# q = init_database.query(TxCache)
# txs = get_upcoming_tx(StatusEnum.WAITFORGAS) q = q.filter(TxCache.sender==agent_roles['ALICE'])
# assert len(txs.keys()) == 1 for o in q.all():
# o.date_checked -= datetime.timedelta(seconds=30)
# init_database.add(o)
#def test_upcoming_with_lock( init_database.commit()
# default_chain_spec,
# init_database, before = datetime.datetime.now() - datetime.timedelta(seconds=20)
# init_w3, logg.debug('before {}'.format(before))
# ): txs = get_upcoming_tx(StatusBits.QUEUED, before=before)
# logg.debug('txs {} {}'.format(txs.keys(), txs.values()))
# chain_id = int(default_chain_spec.chain_id()) assert len(txs.keys()) == 1
# chain_str = str(default_chain_spec)
# # Now date checked has been set to current time, and the check returns no results
# tx = { txs = get_upcoming_tx(StatusBits.QUEUED, before=before)
# 'from': init_w3.eth.accounts[0], logg.debug('txs {} {}'.format(txs.keys(), txs.values()))
# 'to': init_w3.eth.accounts[1], assert len(txs.keys()) == 0
# 'nonce': 42,
# 'gas': 21000, set_sent_status(tx_hashes[0])
# 'gasPrice': 1000000,
# 'value': 128, txs = get_upcoming_tx(StatusBits.QUEUED)
# 'chainId': 8995, assert len(txs.keys()) == 3
# 'data': '0x', with pytest.raises(KeyError):
# } tx = txs[tx_hashes[0]]
# tx_signed = init_w3.eth.sign_transaction(tx)
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id)
# logg.debug('nonce {} {}'.format(tx['nonce'], tx_hash.hex())) assert tx['nonce'] == 43
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec))
# cache_gas_refill_data(tx_hash.hex(), tx) set_waitforgas(tx_hashes[1])
# txs = get_upcoming_tx(StatusBits.QUEUED)
# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) assert len(txs.keys()) == 3
# assert len(txs.keys()) == 1 with pytest.raises(KeyError):
# tx = txs[tx_hashes[1]]
# Lock.set(chain_str, LockEnum.SEND, address=init_w3.eth.accounts[0])
# tx = unpack(bytes.fromhex(strip_0x(txs[tx_hashes[3]])), chain_id)
# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) assert tx['nonce'] == 43
# assert len(txs.keys()) == 0
# txs = get_upcoming_tx(StatusBits.GAS_ISSUES)
# tx = { assert len(txs.keys()) == 1
# 'from': init_w3.eth.accounts[1],
# 'to': init_w3.eth.accounts[0],
# 'nonce': 42, def test_upcoming_with_lock(
# 'gas': 21000, default_chain_spec,
# 'gasPrice': 1000000, init_database,
# 'value': 128, eth_rpc,
# 'chainId': 8995, eth_signer,
# 'data': '0x', agent_roles,
# } ):
# tx_signed = init_w3.eth.sign_transaction(tx)
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw']) chain_id = int(default_chain_spec.chain_id())
# logg.debug('nonce {} {}'.format(tx['nonce'], tx_hash.hex()))
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) rpc = RPCConnection.connect(default_chain_spec, 'default')
# cache_gas_refill_data(tx_hash.hex(), tx) nonce_oracle = StaticNonceOracle(42)
# gas_oracle = RPCGasOracle(eth_rpc)
# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# assert len(txs.keys()) == 1
# (tx_hash_hex, tx_rpc) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6))
# tx_signed_raw_hex = tx_rpc['params'][0]
#def test_obsoletion(
# default_chain_spec, register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# init_w3, cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
# init_database,
# ): txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
# assert len(txs.keys()) == 1
# tx_hashes = []
# for i in range(0, 4): Lock.set(str(default_chain_spec), LockEnum.SEND, address=agent_roles['ALICE'])
# tx = {
# 'from': init_w3.eth.accounts[int(i/2)], txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
# 'to': init_w3.eth.accounts[1], assert len(txs.keys()) == 0
# 'nonce': 42 + int(i/3),
# 'gas': 21000, (tx_hash_hex, tx_rpc) = c.create(agent_roles['BOB'], agent_roles['ALICE'], 100 * (10 ** 6))
# 'gasPrice': 1000000*i, tx_signed_raw_hex = tx_rpc['params'][0]
# 'value': 128,
# 'chainId': 8995, register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# 'data': '0x', cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
# }
# txs = get_upcoming_tx(StatusEnum.PENDING, chain_id=chain_id)
# logg.debug('nonce {}'.format(tx['nonce'])) assert len(txs.keys()) == 1
# tx_signed = init_w3.eth.sign_transaction(tx)
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) def test_obsoletion(
# cache_gas_refill_data(tx_hash.hex(), tx) default_chain_spec,
# tx_hashes.append(tx_hash.hex()) init_database,
# eth_rpc,
# if i < 2: eth_signer,
# set_sent_status(tx_hash.hex()) agent_roles,
# ):
# session = SessionBase.create_session()
# q = session.query(Otx) chain_id = default_chain_spec.chain_id()
# q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value) rpc = RPCConnection.connect(default_chain_spec, 'default')
# z = 0 nonce_oracle = StaticNonceOracle(42)
# for o in q.all(): gas_oracle = RPCGasOracle(eth_rpc)
# z += o.nonce c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
#
# session.close() txs_rpc = [
# assert z == 42 c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
# c.create(agent_roles['ALICE'], agent_roles['DAVE'], 200 * (10 ** 6)),
# set_final_status(tx_hashes[1], 362436, True) c.create(agent_roles['BOB'], agent_roles['DAVE'], 300 * (10 ** 6)),
# ]
# session = SessionBase.create_session()
# q = session.query(Otx) nonce_oracle = StaticNonceOracle(43)
# q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
# zo = 0 txs_rpc += [
# for o in q.all(): c.create(agent_roles['BOB'], agent_roles['DAVE'], 400 * (10 ** 6)),
# zo += o.nonce ]
#
# q = session.query(Otx) tx_hashes = []
# q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value) i = 0
# zc = 0 for entry in txs_rpc:
# for o in q.all(): tx_hash_hex = entry[0]
# zc += o.nonce tx_rpc = entry[1]
# tx_signed_raw_hex = tx_rpc['params'][0]
# session.close()
# assert zo == 0 register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
# assert zc == 42 cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
#
# tx_hashes.append(tx_hash_hex)
#def test_retry(
# init_database, if i < 2:
# ): set_sent_status(tx_hash_hex)
#
# address = '0x' + os.urandom(20).hex() i += 1
# tx_hash = '0x' + os.urandom(32).hex()
# signed_tx = '0x' + os.urandom(128).hex() session = SessionBase.create_session()
# otx = Otx(0, address, tx_hash, signed_tx) q = session.query(Otx)
# init_database.add(otx) q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value)
# init_database.commit() z = 0
# for o in q.all():
# set_sent_status(tx_hash, True) z += o.nonce
# set_ready(tx_hash)
# session.close()
# q = init_database.query(Otx) assert z == 42
# q = q.filter(Otx.tx_hash==tx_hash)
# otx = q.first() set_final_status(tx_hashes[1], 1023, True)
#
# assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value session = SessionBase.create_session()
# assert is_error_status(otx.status) q = session.query(Otx)
# q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value)
# set_sent_status(tx_hash, False) zo = 0
# set_ready(tx_hash) for o in q.all():
# zo += o.nonce
# init_database.commit()
# q = session.query(Otx)
# q = init_database.query(Otx) q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value)
# q = q.filter(Otx.tx_hash==tx_hash) zc = 0
# otx = q.first() for o in q.all():
# zc += o.nonce
# assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value
# assert not is_error_status(otx.status) session.close()
# assert zo == 0
# assert zc == 42
#def test_get_account_tx(
# default_chain_spec,
# init_database, def test_retry(
# init_w3, init_database,
# ): ):
#
# tx_hashes = [] address = '0x' + os.urandom(20).hex()
# tx_hash = '0x' + os.urandom(32).hex()
# for i in range(0, 4): signed_tx = '0x' + os.urandom(128).hex()
# otx = Otx(0, address, tx_hash, signed_tx)
# tx = { init_database.add(otx)
# 'from': init_w3.eth.accounts[int(i/3)], init_database.commit()
# 'to': init_w3.eth.accounts[3-i],
# 'nonce': 42 + i, set_sent_status(tx_hash, True)
# 'gas': 21000, set_ready(tx_hash)
# 'gasPrice': 1000000*i,
# 'value': 128, q = init_database.query(Otx)
# 'chainId': 666, q = q.filter(Otx.tx_hash==tx_hash)
# 'data': '', otx = q.first()
# }
# logg.debug('nonce {}'.format(tx['nonce'])) assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value
# tx_signed = init_w3.eth.sign_transaction(tx) assert is_error_status(otx.status)
# tx_hash = init_w3.keccak(hexstr=tx_signed['raw'])
# queue_create(tx['nonce'], tx['from'], tx_hash.hex(), tx_signed['raw'], str(default_chain_spec)) set_sent_status(tx_hash, False)
# cache_gas_refill_data(tx_hash.hex(), tx) set_ready(tx_hash)
# tx_hashes.append(tx_hash.hex())
# init_database.commit()
# txs = get_account_tx(init_w3.eth.accounts[0])
# logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes)) q = init_database.query(Otx)
# assert list(txs.keys()) == tx_hashes q = q.filter(Otx.tx_hash==tx_hash)
# otx = q.first()
# txs = get_account_tx(init_w3.eth.accounts[0], as_recipient=False)
# assert list(txs.keys()) == tx_hashes[:3] assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value
# assert not is_error_status(otx.status)
# txs = get_account_tx(init_w3.eth.accounts[0], as_sender=False)
# assert list(txs.keys()) == tx_hashes[3:]
def test_get_account_tx(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
):
chain_id = default_chain_spec.chain_id()
rpc = RPCConnection.connect(default_chain_spec, 'default')
nonce_oracle = OverrideNonceOracle(ZERO_ADDRESS, 42)
gas_oracle = RPCGasOracle(eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle, chain_id=default_chain_spec.chain_id())
txs_rpc = [
c.create(agent_roles['ALICE'], agent_roles['DAVE'], 100 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['CAROL'], 200 * (10 ** 6)),
c.create(agent_roles['ALICE'], agent_roles['BOB'], 300 * (10 ** 6)),
c.create(agent_roles['BOB'], agent_roles['ALICE'], 300 * (10 ** 6)),
]
tx_hashes = []
for entry in txs_rpc:
tx_hash_hex = entry[0]
tx_rpc = entry[1]
tx_signed_raw_hex = tx_rpc['params'][0]
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_hashes.append(tx_hash_hex)
txs = get_account_tx(agent_roles['ALICE'])
logg.debug('tx {} tx {}'.format(list(txs.keys()), tx_hashes))
assert list(txs.keys()) == tx_hashes
txs = get_account_tx(agent_roles['ALICE'], as_recipient=False)
assert list(txs.keys()) == tx_hashes[:3]
txs = get_account_tx(agent_roles['ALICE'], as_sender=False)
assert list(txs.keys()) == tx_hashes[3:]

View File

@ -0,0 +1,12 @@
# external imports
from chainlib.eth.nonce import OverrideNonceOracle
from chainlib.eth.constant import ZERO_ADDRESS
class StaticNonceOracle(OverrideNonceOracle):
def __init__(self, nonce):
super(StaticNonceOracle, self).__init__(ZERO_ADDRESS, nonce)
def next_nonce(self):
return self.nonce