2021-02-01 18:12:51 +01:00
# standard imports
import logging
2021-03-29 15:27:53 +02:00
# external imports
2021-02-01 18:12:51 +01:00
import celery
2021-03-29 15:27:53 +02:00
from chainlib . chain import ChainSpec
2021-05-31 17:34:16 +02:00
from chainlib . connection import RPCConnection
from chainlib . eth . tx import (
unpack ,
TxFactory ,
)
from chainlib . eth . gas import OverrideGasOracle
from chainqueue . sql . query import get_tx
from chainqueue . sql . state import set_cancel
2021-04-04 14:40:59 +02:00
from chainqueue . db . models . otx import Otx
from chainqueue . db . models . tx import TxCache
2021-07-21 19:34:51 +02:00
from hexathon import (
strip_0x ,
add_0x ,
uniform as hex_uniform ,
)
2021-05-31 17:34:16 +02:00
from potaahto . symbols import snake_and_camel
2021-02-01 18:12:51 +01:00
# local imports
from cic_eth . db . models . base import SessionBase
from cic_eth . db . models . nonce import Nonce
2021-03-29 15:27:53 +02:00
from cic_eth . admin . ctrl import (
lock_send ,
unlock_send ,
lock_queue ,
unlock_queue ,
)
2021-04-04 14:40:59 +02:00
from cic_eth . queue . tx import queue_create
from cic_eth . eth . gas import create_check_gas_task
2021-05-31 17:34:16 +02:00
from cic_eth . task import BaseTask
2021-02-01 18:12:51 +01:00
celery_app = celery . current_app
logg = logging . getLogger ( )
2021-05-31 17:34:16 +02:00
@celery_app.task ( bind = True , base = BaseTask )
def shift_nonce ( self , chainspec_dict , tx_hash_orig_hex , delta = 1 ) :
2021-02-01 18:12:51 +01:00
""" Shift all transactions with nonces higher than the offset by the provided position delta.
Transactions who are replaced by transactions that move nonces will be marked as OVERRIDDEN .
: param chainstr : Chain specification string representation
: type chainstr : str
: param tx_hash_orig_hex : Transaction hash to resolve to sender and nonce to use as shift offset
: type tx_hash_orig_hex : str , 0 x - hex
: param delta : Amount
"""
2021-05-31 17:34:16 +02:00
chain_spec = ChainSpec . from_dict ( chainspec_dict )
rpc = RPCConnection . connect ( chain_spec , ' default ' )
rpc_signer = RPCConnection . connect ( chain_spec , ' signer ' )
2021-02-01 18:12:51 +01:00
queue = None
try :
queue = self . request . delivery_info . get ( ' routing_key ' )
except AttributeError :
pass
2021-05-31 17:34:16 +02:00
session = BaseTask . session_func ( )
tx_brief = get_tx ( chain_spec , tx_hash_orig_hex , session = session )
tx_raw = bytes . fromhex ( strip_0x ( tx_brief [ ' signed_tx ' ] ) )
2021-04-04 14:40:59 +02:00
tx = unpack ( tx_raw , chain_spec )
2021-02-01 18:12:51 +01:00
nonce = tx_brief [ ' nonce ' ]
address = tx [ ' from ' ]
2021-05-31 17:34:16 +02:00
logg . debug ( ' shifting nonce {} position(s) for address {} , offset {} , hash {} ' . format ( delta , address , nonce , tx [ ' hash ' ] ) )
lock_queue ( None , chain_spec . asdict ( ) , address = address )
lock_send ( None , chain_spec . asdict ( ) , address = address )
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
set_cancel ( chain_spec , strip_0x ( tx [ ' hash ' ] ) , manual = True , session = session )
2021-02-01 18:12:51 +01:00
2021-07-21 19:34:51 +02:00
query_address = add_0x ( hex_uniform ( strip_0x ( address ) ) ) # aaaaargh
2021-02-01 18:12:51 +01:00
q = session . query ( Otx )
q = q . join ( TxCache )
2021-07-21 19:34:51 +02:00
q = q . filter ( TxCache . sender == query_address )
2021-02-01 18:12:51 +01:00
q = q . filter ( Otx . nonce > = nonce + delta )
q = q . order_by ( Otx . nonce . asc ( ) )
otxs = q . all ( )
tx_hashes = [ ]
txs = [ ]
2021-07-21 19:34:51 +02:00
gas_total = 0
2021-02-01 18:12:51 +01:00
for otx in otxs :
2021-04-04 14:40:59 +02:00
tx_raw = bytes . fromhex ( strip_0x ( otx . signed_tx ) )
tx_new = unpack ( tx_raw , chain_spec )
2021-05-31 17:34:16 +02:00
tx_new = snake_and_camel ( tx_new )
2021-02-01 18:12:51 +01:00
tx_previous_hash_hex = tx_new [ ' hash ' ]
tx_previous_nonce = tx_new [ ' nonce ' ]
2021-05-31 17:34:16 +02:00
tx_new [ ' gas_price ' ] + = 1
tx_new [ ' gasPrice ' ] = tx_new [ ' gas_price ' ]
tx_new [ ' nonce ' ] - = delta
2021-07-21 19:34:51 +02:00
gas_total + = tx_new [ ' gas_price ' ] * tx_new [ ' gas ' ]
2021-05-31 17:34:16 +02:00
logg . debug ( ' tx_new {} ' . format ( tx_new ) )
2021-07-21 19:34:51 +02:00
logg . debug ( ' gas running total {} ' . format ( gas_total ) )
2021-05-31 17:34:16 +02:00
2021-02-01 18:12:51 +01:00
del ( tx_new [ ' hash ' ] )
del ( tx_new [ ' hash_unsigned ' ] )
2021-05-31 17:34:16 +02:00
del ( tx_new [ ' hashUnsigned ' ] )
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
gas_oracle = OverrideGasOracle ( limit = tx_new [ ' gas ' ] , price = tx_new [ ' gas_price ' ] + 1 ) # TODO: it should be possible to merely set this price here and if missing in the existing struct then fill it in (chainlib.eth.tx)
c = TxFactory ( chain_spec , signer = rpc_signer , gas_oracle = gas_oracle )
( tx_hash_hex , tx_signed_raw_hex ) = c . build_raw ( tx_new )
2021-02-01 18:12:51 +01:00
logg . debug ( ' tx {} -> {} nonce {} -> {} ' . format ( tx_previous_hash_hex , tx_hash_hex , tx_previous_nonce , tx_new [ ' nonce ' ] ) )
otx = Otx (
2021-05-31 17:34:16 +02:00
tx_new [ ' nonce ' ] ,
tx_hash_hex ,
tx_signed_raw_hex ,
)
2021-02-01 18:12:51 +01:00
session . add ( otx )
# TODO: cancel all first, then replace. Otherwise we risk two non-locked states for two different nonces.
2021-05-31 17:34:16 +02:00
set_cancel ( chain_spec , strip_0x ( tx_previous_hash_hex ) , manual = True , session = session )
2021-02-01 18:12:51 +01:00
2021-05-31 17:34:16 +02:00
TxCache . clone ( tx_previous_hash_hex , tx_hash_hex , session = session )
2021-02-01 18:12:51 +01:00
tx_hashes . append ( tx_hash_hex )
txs . append ( tx_signed_raw_hex )
2021-05-31 17:34:16 +02:00
session . commit ( )
2021-02-01 18:12:51 +01:00
session . close ( )
2021-05-31 17:34:16 +02:00
s = create_check_gas_task (
2021-02-01 18:12:51 +01:00
txs ,
2021-05-31 17:34:16 +02:00
chain_spec ,
2021-07-21 19:34:51 +02:00
#tx_new['from'],
address ,
#gas=tx_new['gas'],
gas = gas_total ,
2021-05-31 17:34:16 +02:00
tx_hashes_hex = tx_hashes ,
queue = queue ,
2021-02-01 18:12:51 +01:00
)
s_unlock_send = celery . signature (
' cic_eth.admin.ctrl.unlock_send ' ,
[
2021-05-31 17:34:16 +02:00
chain_spec . asdict ( ) ,
2021-07-21 19:34:51 +02:00
address ,
#tx_new['from'],
2021-02-01 18:12:51 +01:00
] ,
queue = queue ,
)
s_unlock_direct = celery . signature (
' cic_eth.admin.ctrl.unlock_queue ' ,
[
2021-05-31 17:34:16 +02:00
chain_spec . asdict ( ) ,
2021-07-21 19:34:51 +02:00
address ,
#tx_new['from'],
2021-02-01 18:12:51 +01:00
] ,
queue = queue ,
)
s_unlocks = celery . group ( s_unlock_send , s_unlock_direct )
s . link ( s_unlocks )
s . apply_async ( )