cic-internal-integration/apps/cic-eth/cic_eth/admin/nonce.py

150 lines
4.7 KiB
Python
Raw Permalink Normal View History

2021-02-01 18:12:51 +01:00
# standard imports
import logging
# external imports
2021-02-01 18:12:51 +01:00
import celery
from chainlib.chain import ChainSpec
2021-05-31 17:34:16 +02:00
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
TxFactory,
)
from chainlib.eth.gas import OverrideGasOracle
from chainqueue.sql.query import get_tx
from chainqueue.sql.state import set_cancel
2021-04-04 14:40:59 +02:00
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.tx import TxCache
2021-05-31 17:34:16 +02:00
from hexathon import strip_0x
from potaahto.symbols import snake_and_camel
2021-02-01 18:12:51 +01:00
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.nonce import Nonce
from cic_eth.admin.ctrl import (
lock_send,
unlock_send,
lock_queue,
unlock_queue,
)
2021-04-04 14:40:59 +02:00
from cic_eth.queue.tx import queue_create
from cic_eth.eth.gas import create_check_gas_task
2021-05-31 17:34:16 +02:00
from cic_eth.task import BaseTask
2021-02-01 18:12:51 +01:00
celery_app = celery.current_app
logg = logging.getLogger()
2021-05-31 17:34:16 +02:00
@celery_app.task(bind=True, base=BaseTask)
def shift_nonce(self, chainspec_dict, tx_hash_orig_hex, delta=1):
2021-02-01 18:12:51 +01:00
"""Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN.
:param chainstr: Chain specification string representation
:type chainstr: str
:param tx_hash_orig_hex: Transaction hash to resolve to sender and nonce to use as shift offset
:type tx_hash_orig_hex: str, 0x-hex
:param delta: Amount
"""
2021-05-31 17:34:16 +02:00
chain_spec = ChainSpec.from_dict(chainspec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
2021-02-01 18:12:51 +01:00
queue = None
try:
queue = self.request.delivery_info.get('routing_key')
except AttributeError:
pass
2021-05-31 17:34:16 +02:00
session = BaseTask.session_func()
tx_brief = get_tx(chain_spec, tx_hash_orig_hex, session=session)
tx_raw = bytes.fromhex(strip_0x(tx_brief['signed_tx']))
2021-04-04 14:40:59 +02:00
tx = unpack(tx_raw, chain_spec)
2021-02-01 18:12:51 +01:00
nonce = tx_brief['nonce']
address = tx['from']
2021-05-31 17:34:16 +02:00
logg.debug('shifting nonce {} position(s) for address {}, offset {}, hash {}'.format(delta, address, nonce, tx['hash']))
lock_queue(None, chain_spec.asdict(), address=address)
lock_send(None, chain_spec.asdict(), address=address)
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
set_cancel(chain_spec, strip_0x(tx['hash']), manual=True, session=session)
2021-02-01 18:12:51 +01:00
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.sender==address)
q = q.filter(Otx.nonce>=nonce+delta)
q = q.order_by(Otx.nonce.asc())
otxs = q.all()
tx_hashes = []
txs = []
for otx in otxs:
2021-04-04 14:40:59 +02:00
tx_raw = bytes.fromhex(strip_0x(otx.signed_tx))
tx_new = unpack(tx_raw, chain_spec)
2021-05-31 17:34:16 +02:00
tx_new = snake_and_camel(tx_new)
2021-02-01 18:12:51 +01:00
tx_previous_hash_hex = tx_new['hash']
tx_previous_nonce = tx_new['nonce']
2021-05-31 17:34:16 +02:00
tx_new['gas_price'] += 1
tx_new['gasPrice'] = tx_new['gas_price']
tx_new['nonce'] -= delta
logg.debug('tx_new {}'.format(tx_new))
2021-02-01 18:12:51 +01:00
del(tx_new['hash'])
del(tx_new['hash_unsigned'])
2021-05-31 17:34:16 +02:00
del(tx_new['hashUnsigned'])
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
gas_oracle = OverrideGasOracle(limit=tx_new['gas'], price=tx_new['gas_price'] + 1) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx_new)
2021-02-01 18:12:51 +01:00
logg.debug('tx {} -> {} nonce {} -> {}'.format(tx_previous_hash_hex, tx_hash_hex, tx_previous_nonce, tx_new['nonce']))
otx = Otx(
2021-05-31 17:34:16 +02:00
tx_new['nonce'],
tx_hash_hex,
tx_signed_raw_hex,
)
2021-02-01 18:12:51 +01:00
session.add(otx)
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
2021-05-31 17:34:16 +02:00
set_cancel(chain_spec, strip_0x(tx_previous_hash_hex), manual=True, session=session)
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
TxCache.clone(tx_previous_hash_hex, tx_hash_hex, session=session)
2021-02-01 18:12:51 +01:00
tx_hashes.append(tx_hash_hex)
txs.append(tx_signed_raw_hex)
2021-05-31 17:34:16 +02:00
session.commit()
2021-02-01 18:12:51 +01:00
session.close()
2021-05-31 17:34:16 +02:00
s = create_check_gas_task(
2021-02-01 18:12:51 +01:00
txs,
2021-05-31 17:34:16 +02:00
chain_spec,
2021-02-01 18:12:51 +01:00
tx_new['from'],
2021-05-31 17:34:16 +02:00
gas=tx_new['gas'],
tx_hashes_hex=tx_hashes,
queue=queue,
2021-02-01 18:12:51 +01:00
)
s_unlock_send = celery.signature(
'cic_eth.admin.ctrl.unlock_send',
[
2021-05-31 17:34:16 +02:00
chain_spec.asdict(),
2021-02-01 18:12:51 +01:00
tx_new['from'],
],
queue=queue,
)
s_unlock_direct = celery.signature(
'cic_eth.admin.ctrl.unlock_queue',
[
2021-05-31 17:34:16 +02:00
chain_spec.asdict(),
2021-02-01 18:12:51 +01:00
tx_new['from'],
],
queue=queue,
)
s_unlocks = celery.group(s_unlock_send, s_unlock_direct)
s.link(s_unlocks)
s.apply_async()