Add fix nonce test, upgrade chainqueue

This commit is contained in:
nolash 2021-05-28 11:12:34 +02:00
parent e54e7cf864
commit 0b5fb22b25
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 93 additions and 21 deletions

View File

@ -10,8 +10,8 @@ from chainlib.eth.tx import (
TxFactory, TxFactory,
) )
from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.gas import OverrideGasOracle
from chainqueue.query import get_tx from chainqueue.sql.query import get_tx
from chainqueue.state import set_cancel from chainqueue.sql.state import set_cancel
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
from hexathon import strip_0x from hexathon import strip_0x
@ -28,13 +28,14 @@ from cic_eth.admin.ctrl import (
) )
from cic_eth.queue.tx import queue_create from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task from cic_eth.eth.gas import create_check_gas_task
from cic_eth.task import BaseTask
celery_app = celery.current_app celery_app = celery.current_app
logg = logging.getLogger() logg = logging.getLogger()
@celery_app.task(bind=True) @celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1): def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
"""Shift all transactions with nonces higher than the offset by the provided position delta. """Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN. Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
@ -45,7 +46,7 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
:type tx_hash_orig_hex: str, 0x-hex :type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount :param delta: Amount
""" """
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default') rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer') rpc_signer = RPCConnection.connect(chain_spec, 'signer')
queue = None queue = None
@ -54,18 +55,20 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
except AttributeError: except AttributeError:
pass pass
session = SessionBase.create_session() session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session) tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx'])) tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
tx = unpack(tx_raw, chain_spec) tx = unpack(tx_raw, chain_spec)
nonce = tx_brief['nonce'] nonce = tx_brief['nonce']
address = tx['from'] address = tx['from']
logg.debug('shifting nonce {} position(s) for address {}, offset {}'.format(delta, address, nonce)) logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
lock_queue(None, chain_spec.asdict(), address=address) lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address) lock_send(None, chain_spec.asdict(), address=address)
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(TxCache.sender==address) q = q.filter(TxCache.sender==address)
@ -83,13 +86,16 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
tx_previous_hash_hex = tx_new['hash'] tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce'] tx_previous_nonce = tx_new['nonce']
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
tx_new['gas_price'] += 1 tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price'] tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta tx_new['nonce'] -= delta
logg.debug('tx_nex {}'.format(tx_new)) logg.debug('tx_new {}'.format(tx_new))
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
del(tx_new['hashUnsigned'])
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx) gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle) c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new) (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
@ -101,15 +107,15 @@ def shift_nonce(self, chain_str, tx_hash_orig_hex, delta=1):
tx_signed_raw_hex, tx_signed_raw_hex,
) )
session.add(otx) session.add(otx)
session.commit()
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces. # TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
set_cancel(chain_spec, tx_previous_hash_hex, manual=True, session=session) set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session) TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
tx_hashes.append(tx_hash_hex) tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex) txs.append(tx_signed_raw_hex)
session.commit()
session.close() session.close()

View File

@ -30,13 +30,13 @@ from chainqueue.db.enum import (
status_str, status_str,
) )
from chainqueue.error import TxStateChangeError from chainqueue.error import TxStateChangeError
from chainqueue.query import get_tx
# local imports # local imports
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import Nonce
from cic_eth.error import InitializationError from cic_eth.error import InitializationError
from cic_eth.queue.query import get_tx
app = celery.current_app app = celery.current_app
@ -264,7 +264,8 @@ class AdminApi:
} }
def fix_nonce(self, address, nonce, chain_spec): # TODO: is risky since it does not validate that there is actually a nonce problem?
def fix_nonce(self, chain_spec, address, nonce):
s = celery.signature( s = celery.signature(
'cic_eth.queue.query.get_account_tx', 'cic_eth.queue.query.get_account_tx',
[ [
@ -278,15 +279,17 @@ class AdminApi:
txs = s.apply_async().get() txs = s.apply_async().get()
tx_hash_hex = None tx_hash_hex = None
session = SessionBase.create_session()
for k in txs.keys(): for k in txs.keys():
tx_dict = get_tx(k) tx_dict = get_tx(chain_spec, k, session=session)
if tx_dict['nonce'] == nonce: if tx_dict['nonce'] == nonce:
tx_hash_hex = k tx_hash_hex = k
session.close()
s_nonce = celery.signature( s_nonce = celery.signature(
'cic_eth.admin.nonce.shift_nonce', 'cic_eth.admin.nonce.shift_nonce',
[ [
self.rpc.chain_spec.asdict(), chain_spec.asdict(),
tx_hash_hex, tx_hash_hex,
], ],
queue=self.queue queue=self.queue

View File

@ -17,7 +17,7 @@ eth-address-index~=0.1.1a11
chainlib~=0.0.3rc2 chainlib~=0.0.3rc2
hexathon~=0.0.1a7 hexathon~=0.0.1a7
chainsyncer[sql]~=0.0.2a4 chainsyncer[sql]~=0.0.2a4
chainqueue~=0.0.2a2 chainqueue~=0.0.2b1
sarafu-faucet==0.0.3a3 sarafu-faucet==0.0.3a3
erc20-faucet==0.2.1a4 erc20-faucet==0.2.1a4
coincurve==15.0.0 coincurve==15.0.0

View File

@ -34,7 +34,10 @@ from chainqueue.state import (
set_ready, set_ready,
set_reserved, set_reserved,
) )
from chainqueue.query import get_tx from chainqueue.query import (
get_tx,
get_nonce_tx_cache,
)
# local imports # local imports
from cic_eth.api import AdminApi from cic_eth.api import AdminApi
@ -133,7 +136,6 @@ def test_tx(
logg.warning('code missing to verify tx contents {}'.format(tx)) logg.warning('code missing to verify tx contents {}'.format(tx))
def test_check_nonce_gap( def test_check_nonce_gap(
default_chain_spec, default_chain_spec,
init_database, init_database,
@ -236,3 +238,64 @@ def test_check_nonce_localfail(
assert r['nonce']['blocking'] == 4 assert r['nonce']['blocking'] == 4
assert r['tx']['blocking'] == tx_hashes[4] assert r['tx']['blocking'] == tx_hashes[4]
def test_fix_nonce(
default_chain_spec,
init_database,
eth_rpc,
eth_signer,
agent_roles,
contract_roles,
celery_session_worker,
init_celery_tasks,
caplog,
):
nonce_oracle = OverrideNonceOracle(agent_roles['ALICE'], 0)
gas_oracle = OverrideGasOracle(limit=21000, conn=eth_rpc)
tx_hashes = []
txs = []
for i in range(10):
c = Gas(default_chain_spec, signer=eth_signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 100 * (10 ** 6), tx_format=TxFormat.RLP_SIGNED)
queue_create(
default_chain_spec,
i,
agent_roles['ALICE'],
tx_hash_hex,
tx_signed_raw_hex,
session=init_database,
)
cache_gas_data(
tx_hash_hex,
tx_signed_raw_hex,
default_chain_spec.asdict(),
)
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
init_database.commit()
api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['DEFAULT'])
t = api.fix_nonce(default_chain_spec, agent_roles['ALICE'], 3)
r = t.get_leaf()
assert t.successful()
init_database.commit()
txs = get_nonce_tx_cache(default_chain_spec, 3, agent_roles['ALICE'], session=init_database)
ks = txs.keys()
assert len(ks) == 2
for k in ks:
hsh = add_0x(k)
otx = Otx.load(hsh, session=init_database)
init_database.refresh(otx)
logg.debug('checking nonce {} tx {} status {}'.format(3, otx.tx_hash, otx.status))
if add_0x(k) == tx_hashes[3]:
assert otx.status & StatusBits.OBSOLETE == StatusBits.OBSOLETE
else:
assert otx.status == 1

View File

@ -65,7 +65,7 @@ def test_shift_nonce(
s = celery.signature( s = celery.signature(
'cic_eth.admin.nonce.shift_nonce', 'cic_eth.admin.nonce.shift_nonce',
[ [
str(default_chain_spec), default_chain_spec.asdict(),
tx_hashes[3], tx_hashes[3],
], ],
queue=None queue=None
@ -76,7 +76,7 @@ def test_shift_nonce(
init_database.commit() init_database.commit()
for i in range(42+4, 42+10): for i in range(42+3, 42+10):
txs = get_nonce_tx_cache(default_chain_spec, i, agent_roles['ALICE'], session=init_database) txs = get_nonce_tx_cache(default_chain_spec, i, agent_roles['ALICE'], session=init_database)
for k in txs.keys(): for k in txs.keys():
hsh = add_0x(k) hsh = add_0x(k)