685 lines
22 KiB
Python
685 lines
22 KiB
Python
# standard imports
|
||
import logging
|
||
|
||
# third-party imports
|
||
import celery
|
||
import requests
|
||
import web3
|
||
from cic_registry import zero_address
|
||
from cic_registry.chain import ChainSpec
|
||
|
||
# local imports
|
||
from .rpc import RpcClient
|
||
from cic_eth.db import Otx, SessionBase
|
||
from cic_eth.db.models.tx import TxCache
|
||
from cic_eth.db.models.lock import Lock
|
||
from cic_eth.db.enum import (
|
||
LockEnum,
|
||
StatusBits,
|
||
)
|
||
from cic_eth.error import PermanentTxError
|
||
from cic_eth.error import TemporaryTxError
|
||
from cic_eth.error import NotLocalTxError
|
||
from cic_eth.queue.tx import create as queue_create
|
||
from cic_eth.queue.tx import get_tx
|
||
from cic_eth.queue.tx import get_nonce_tx
|
||
from cic_eth.error import OutOfGasError
|
||
from cic_eth.error import LockedError
|
||
from cic_eth.eth.util import unpack_signed_raw_tx
|
||
from cic_eth.eth.task import sign_and_register_tx, create_check_gas_and_send_task
|
||
from cic_eth.eth.task import sign_tx
|
||
from cic_eth.eth.nonce import NonceOracle
|
||
from cic_eth.error import AlreadyFillingGasError
|
||
from cic_eth.eth.util import tx_hex_string
|
||
from cic_eth.admin.ctrl import lock_send
|
||
|
||
celery_app = celery.current_app
|
||
logg = logging.getLogger()
|
||
|
||
MAX_NONCE_ATTEMPTS = 3
|
||
|
||
|
||
# TODO this function is too long
|
||
@celery_app.task(bind=True, throws=(OutOfGasError))
|
||
def check_gas(self, tx_hashes, chain_str, txs=[], address=None, gas_required=None):
|
||
"""Check the gas level of the sender address of a transaction.
|
||
|
||
If the account balance is not sufficient for the required gas, gas refill is requested and OutOfGasError raiser.
|
||
|
||
If account balance is sufficient, but level of gas before spend is below "safe" threshold, gas refill is requested, and execution continues normally.
|
||
|
||
:param tx_hashes: Transaction hashes due to be submitted
|
||
:type tx_hashes: list of str, 0x-hex
|
||
:param chain_str: Chain spec string representation
|
||
:type chain_str: str
|
||
:param txs: Signed raw transaction data, corresponding to tx_hashes
|
||
:type txs: list of str, 0x-hex
|
||
:param address: Sender address
|
||
:type address: str, 0x-hex
|
||
:param gas_required: Gas limit * gas price for transaction, (optional, if not set will be retrived from transaction data)
|
||
:type gas_required: int
|
||
:return: Signed raw transaction data list
|
||
:rtype: param txs, unchanged
|
||
"""
|
||
if len(txs) == 0:
|
||
for i in range(len(tx_hashes)):
|
||
o = get_tx(tx_hashes[i])
|
||
txs.append(o['signed_tx'])
|
||
logg.debug('ooooo {}'.format(o))
|
||
if address == None:
|
||
address = o['address']
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
#c = RpcClient(chain_spec, holder_address=address)
|
||
c = RpcClient(chain_spec)
|
||
|
||
# TODO: it should not be necessary to pass address explicitly, if not passed should be derived from the tx
|
||
balance = c.w3.eth.getBalance(address)
|
||
logg.debug('check gas txs {}'.format(tx_hashes))
|
||
logg.debug('address {} has gas {} needs {}'.format(address, balance, gas_required))
|
||
|
||
if gas_required > balance:
|
||
s_refill_gas = celery.signature(
|
||
'cic_eth.eth.tx.refill_gas',
|
||
[
|
||
address,
|
||
chain_str,
|
||
],
|
||
queue=queue,
|
||
)
|
||
s_refill_gas.apply_async()
|
||
wait_tasks = []
|
||
for tx_hash in tx_hashes:
|
||
s = celery.signature(
|
||
'cic_eth.queue.tx.set_waitforgas',
|
||
[
|
||
tx_hash,
|
||
],
|
||
queue=queue,
|
||
)
|
||
wait_tasks.append(s)
|
||
celery.group(wait_tasks)()
|
||
raise OutOfGasError('need to fill gas, required {}, had {}'.format(gas_required, balance))
|
||
|
||
safe_gas = c.safe_threshold_amount()
|
||
if balance < safe_gas:
|
||
s_refill_gas = celery.signature(
|
||
'cic_eth.eth.tx.refill_gas',
|
||
[
|
||
address,
|
||
chain_str,
|
||
],
|
||
queue=queue,
|
||
)
|
||
s_refill_gas.apply_async()
|
||
logg.debug('requested refill from {} to {}'.format(c.gas_provider(), address))
|
||
ready_tasks = []
|
||
for tx_hash in tx_hashes:
|
||
s = celery.signature(
|
||
'cic_eth.queue.tx.set_ready',
|
||
[
|
||
tx_hash,
|
||
],
|
||
queue=queue,
|
||
)
|
||
ready_tasks.append(s)
|
||
logg.debug('tasks {}'.format(ready_tasks))
|
||
celery.group(ready_tasks)()
|
||
|
||
return txs
|
||
|
||
|
||
# TODO: chain chainable transactions that use hashes as inputs may be chained to this function to output signed txs instead.
|
||
@celery_app.task(bind=True)
|
||
def hashes_to_txs(self, tx_hashes):
|
||
"""Return a list of raw signed transactions from the local transaction queue corresponding to a list of transaction hashes.
|
||
|
||
:param tx_hashes: Transaction hashes
|
||
:type tx_hashes: list of str, 0x-hex
|
||
:raises ValueError: Empty input list
|
||
:returns: Signed raw transactions
|
||
:rtype: list of str, 0x-hex
|
||
"""
|
||
#logg = celery_app.log.get_default_logger()
|
||
if len(tx_hashes) == 0:
|
||
raise ValueError('no transaction to send')
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
#otxs = ','.format("'{}'".format(tx_hash) for tx_hash in tx_hashes)
|
||
|
||
session = SessionBase.create_session()
|
||
q = session.query(Otx.signed_tx)
|
||
q = q.filter(Otx.tx_hash.in_(tx_hashes))
|
||
tx_tuples = q.all()
|
||
session.close()
|
||
|
||
def __head(x):
|
||
return x[0]
|
||
|
||
txs = []
|
||
for f in map(__head, tx_tuples):
|
||
txs.append(f)
|
||
|
||
return txs
|
||
|
||
|
||
# TODO: Move this and send to subfolder submodule
|
||
class ParityNodeHandler:
|
||
def __init__(self, chain_spec, queue):
|
||
self.chain_spec = chain_spec
|
||
self.chain_str = str(chain_spec)
|
||
self.queue = queue
|
||
|
||
def handle(self, exception, tx_hash_hex, tx_hex):
|
||
meth = self.handle_default
|
||
if isinstance(exception, (ValueError)):
|
||
# s_debug = celery.signature(
|
||
# 'cic_eth.admin.debug.out_tmp',
|
||
# [tx_hash_hex, '{}: {}'.format(tx_hash_hex, exception)],
|
||
# queue=queue,
|
||
# )
|
||
# s_debug.apply_async()
|
||
earg = exception.args[0]
|
||
if earg['code'] == -32010:
|
||
logg.debug('skipping lock for code {}'.format(earg['code']))
|
||
meth = self.handle_invalid_parameters
|
||
elif earg['code'] == -32602:
|
||
meth = self.handle_invalid_encoding
|
||
else:
|
||
meth = self.handle_invalid
|
||
elif isinstance(exception, (requests.exceptions.ConnectionError)):
|
||
meth = self.handle_connection
|
||
(t, e_fn, message) = meth(tx_hash_hex, tx_hex)
|
||
return (t, e_fn, '{} {}'.format(message, exception))
|
||
|
||
|
||
def handle_connection(self, tx_hash_hex, tx_hex):
|
||
s_set_sent = celery.signature(
|
||
'cic_eth.queue.tx.set_sent_status',
|
||
[
|
||
tx_hash_hex,
|
||
True,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
t = s_set_sent.apply_async()
|
||
return (t, TemporaryTxError, 'Sendfail {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||
|
||
|
||
def handle_invalid_encoding(self, tx_hash_hex, tx_hex):
|
||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||
s_lock = celery.signature(
|
||
'cic_eth.admin.ctrl.lock_send',
|
||
[
|
||
tx_hash_hex,
|
||
self.chain_str,
|
||
tx['from'],
|
||
tx_hash_hex,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
s_set_reject = celery.signature(
|
||
'cic_eth.queue.tx.set_rejected',
|
||
[],
|
||
queue=self.queue,
|
||
)
|
||
nonce_txs = get_nonce_tx(tx['nonce'], tx['from'], self.chain_spec.chain_id())
|
||
attempts = len(nonce_txs)
|
||
if attempts < MAX_NONCE_ATTEMPTS:
|
||
logg.debug('nonce {} address {} retries {} < {}'.format(tx['nonce'], tx['from'], attempts, MAX_NONCE_ATTEMPTS))
|
||
s_resend = celery.signature(
|
||
'cic_eth.eth.tx.resend_with_higher_gas',
|
||
[
|
||
self.chain_str,
|
||
None,
|
||
1.01,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
s_unlock = celery.signature(
|
||
'cic_eth.admin.ctrl.unlock_send',
|
||
[
|
||
self.chain_str,
|
||
tx['from'],
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
s_resend.link(s_unlock)
|
||
s_set_reject.link(s_resend)
|
||
|
||
s_lock.link(s_set_reject)
|
||
t = s_lock.apply_async()
|
||
return (t, PermanentTxError, 'Reject invalid encoding {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||
|
||
|
||
def handle_invalid_parameters(self, tx_hash_hex, tx_hex):
|
||
s_sync = celery.signature(
|
||
'cic_eth.eth.tx.sync_tx',
|
||
[
|
||
tx_hash_hex,
|
||
self.chain_str,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
t = s_sync.apply_async()
|
||
return (t, PermanentTxError, 'Reject invalid parameters {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||
|
||
|
||
def handle_invalid(self, tx_hash_hex, tx_hex):
|
||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||
s_lock = celery.signature(
|
||
'cic_eth.admin.ctrl.lock_send',
|
||
[
|
||
tx_hash_hex,
|
||
self.chain_str,
|
||
tx['from'],
|
||
tx_hash_hex,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
s_set_reject = celery.signature(
|
||
'cic_eth.queue.tx.set_rejected',
|
||
[],
|
||
queue=self.queue,
|
||
)
|
||
s_lock.link(s_set_reject)
|
||
t = s_lock.apply_async()
|
||
return (t, PermanentTxError, 'Reject invalid {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||
|
||
|
||
def handle_default(self, tx_hash_hex, tx_hex):
|
||
tx_bytes = bytes.fromhex(tx_hex[2:])
|
||
tx = unpack_signed_raw_tx(tx_bytes, self.chain_spec.chain_id())
|
||
s_lock = celery.signature(
|
||
'cic_eth.admin.ctrl.lock_send',
|
||
[
|
||
tx_hash_hex,
|
||
self.chain_str,
|
||
tx['from'],
|
||
tx_hash_hex,
|
||
],
|
||
queue=self.queue,
|
||
)
|
||
s_set_fubar = celery.signature(
|
||
'cic_eth.queue.tx.set_fubar',
|
||
[],
|
||
queue=self.queue,
|
||
)
|
||
s_lock.link(s_set_fubar)
|
||
t = s_lock.apply_async()
|
||
return (t, PermanentTxError, 'Fubar {}'.format(tx_hex_string(tx_hex, self.chain_spec.chain_id())))
|
||
|
||
|
||
@celery_app.task(bind=True)
|
||
def send(self, txs, chain_str):
|
||
"""Send transactions to the network.
|
||
|
||
If more than one transaction is passed to the task, it will spawn a new send task with the remaining transaction(s) after the first in the list has been processed.
|
||
|
||
Updates the outgoing transaction queue entry to SENT on successful send.
|
||
|
||
If a temporary error occurs, the queue entry is set to SENDFAIL.
|
||
|
||
If a permanent error occurs due to invalid transaction data, queue entry value is set to REJECTED.
|
||
|
||
Any other permanent error that isn't explicitly handled will get value FUBAR.
|
||
|
||
:param txs: Signed raw transaction data
|
||
:type txs: list of str, 0x-hex
|
||
:param chain_str: Chain spec, string representation
|
||
:type chain_str: str
|
||
:raises TemporaryTxError: If unable to connect to node
|
||
:raises PermanentTxError: If EVM execution fails immediately due to tx input, or if tx contents are invalid.
|
||
:return: transaction hash of sent transaction
|
||
:rtype: str, 0x-hex
|
||
"""
|
||
if len(txs) == 0:
|
||
raise ValueError('no transaction to send')
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
|
||
tx_hex = txs[0]
|
||
logg.debug('send transaction {}'.format(tx_hex))
|
||
|
||
tx_hash = web3.Web3.keccak(hexstr=tx_hex)
|
||
tx_hash_hex = tx_hash.hex()
|
||
|
||
queue = self.request.delivery_info.get('routing_key', None)
|
||
if queue == None:
|
||
logg.debug('send tx {} has no queue', tx_hash)
|
||
|
||
c = RpcClient(chain_spec)
|
||
r = None
|
||
try:
|
||
r = c.w3.eth.send_raw_transaction(tx_hex)
|
||
except Exception as e:
|
||
logg.debug('e {}'.format(e))
|
||
raiser = ParityNodeHandler(chain_spec, queue)
|
||
(t, e, m) = raiser.handle(e, tx_hash_hex, tx_hex)
|
||
raise e(m)
|
||
|
||
s_set_sent = celery.signature(
|
||
'cic_eth.queue.tx.set_sent_status',
|
||
[
|
||
tx_hash_hex,
|
||
False
|
||
],
|
||
queue=queue,
|
||
)
|
||
s_set_sent.apply_async()
|
||
|
||
tx_tail = txs[1:]
|
||
if len(tx_tail) > 0:
|
||
s = celery.signature(
|
||
'cic_eth.eth.tx.send',
|
||
[tx_tail],
|
||
queue=queue,
|
||
)
|
||
s.apply_async()
|
||
|
||
return r.hex()
|
||
|
||
|
||
@celery_app.task(bind=True, throws=(AlreadyFillingGasError))
|
||
def refill_gas(self, recipient_address, chain_str):
|
||
"""Executes a native token transaction to fund the recipient's gas expenditures.
|
||
|
||
:param recipient_address: Recipient in need of gas
|
||
:type recipient_address: str, 0x-hex
|
||
:param chain_str: Chain spec, string representation
|
||
:type chain_str: str
|
||
:raises AlreadyFillingGasError: A gas refill transaction for this address is already executing
|
||
:returns: Transaction hash.
|
||
:rtype: str, 0x-hex
|
||
"""
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
|
||
session = SessionBase.create_session()
|
||
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
|
||
q = session.query(Otx.tx_hash)
|
||
q = q.join(TxCache)
|
||
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
|
||
q = q.filter(TxCache.from_value!=0)
|
||
q = q.filter(TxCache.recipient==recipient_address)
|
||
c = q.count()
|
||
session.close()
|
||
if c > 0:
|
||
raise AlreadyFillingGasError(recipient_address)
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
c = RpcClient(chain_spec)
|
||
clogg = celery_app.log.get_default_logger()
|
||
logg.debug('refill gas from provider address {}'.format(c.gas_provider()))
|
||
default_nonce = c.w3.eth.getTransactionCount(c.gas_provider(), 'pending')
|
||
nonce_generator = NonceOracle(c.gas_provider(), default_nonce)
|
||
nonce = nonce_generator.next()
|
||
gas_price = c.gas_price()
|
||
gas_limit = c.default_gas_limit
|
||
refill_amount = c.refill_amount()
|
||
logg.debug('gas price {} nonce {}'.format(gas_price, nonce))
|
||
|
||
# create and sign transaction
|
||
tx_send_gas = {
|
||
'from': c.gas_provider(),
|
||
'to': recipient_address,
|
||
'gas': gas_limit,
|
||
'gasPrice': gas_price,
|
||
'chainId': chain_spec.chain_id(),
|
||
'nonce': nonce,
|
||
'value': refill_amount,
|
||
'data': '',
|
||
}
|
||
logg.debug('txsend_gas {}'.format(tx_send_gas))
|
||
tx_send_gas_signed = c.w3.eth.sign_transaction(tx_send_gas)
|
||
tx_hash = web3.Web3.keccak(hexstr=tx_send_gas_signed['raw'])
|
||
tx_hash_hex = tx_hash.hex()
|
||
|
||
# TODO: route this through sign_and_register_tx instead
|
||
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
|
||
queue_create(
|
||
nonce,
|
||
c.gas_provider(),
|
||
tx_hash_hex,
|
||
tx_send_gas_signed['raw'],
|
||
chain_str,
|
||
)
|
||
|
||
s_tx_cache = celery.signature(
|
||
'cic_eth.eth.tx.cache_gas_refill_data',
|
||
[
|
||
tx_hash_hex,
|
||
tx_send_gas,
|
||
],
|
||
queue=queue,
|
||
)
|
||
s_status = celery.signature(
|
||
'cic_eth.queue.tx.set_ready',
|
||
[
|
||
tx_hash_hex,
|
||
],
|
||
queue=queue,
|
||
)
|
||
celery.group(s_tx_cache, s_status)()
|
||
return tx_send_gas_signed['raw']
|
||
|
||
|
||
@celery_app.task(bind=True)
|
||
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
|
||
"""Create a new transaction from an existing one with same nonce and higher gas price.
|
||
|
||
:param txold_hash_hex: Transaction to re-create
|
||
:type txold_hash_hex: str, 0x-hex
|
||
:param chain_str: Chain spec, string representation
|
||
:type chain_str: str
|
||
:param gas: Explicitly use the specified gas amount
|
||
:type gas: number
|
||
:param default_factor: Default factor by which to increment the gas price by
|
||
:type default_factor: float
|
||
:raises NotLocalTxError: Transaction does not exist in the local queue
|
||
:returns: Transaction hash
|
||
:rtype: str, 0x-hex
|
||
"""
|
||
session = SessionBase.create_session()
|
||
otx = session.query(Otx).filter(Otx.tx_hash==txold_hash_hex).first()
|
||
if otx == None:
|
||
session.close()
|
||
raise NotLocalTxError(txold_hash_hex)
|
||
session.close()
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
c = RpcClient(chain_spec)
|
||
|
||
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
|
||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
logg.debug('before {}'.format(tx))
|
||
if gas != None:
|
||
tx['gasPrice'] = gas
|
||
else:
|
||
gas_price = c.gas_price()
|
||
if tx['gasPrice'] > gas_price:
|
||
logg.warning('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
|
||
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
|
||
tx['gasPrice'] += 1
|
||
else:
|
||
new_gas_price = int(tx['gasPrice'] * default_factor)
|
||
if gas_price > new_gas_price:
|
||
tx['gasPrice'] = gas_price
|
||
else:
|
||
tx['gasPrice'] = new_gas_price
|
||
|
||
logg.debug('after {}'.format(tx))
|
||
|
||
#(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx, chain_str, queue)
|
||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
|
||
queue_create(
|
||
tx['nonce'],
|
||
tx['from'],
|
||
tx_hash_hex,
|
||
tx_signed_raw_hex,
|
||
chain_str,
|
||
)
|
||
TxCache.clone(txold_hash_hex, tx_hash_hex)
|
||
|
||
s = create_check_gas_and_send_task(
|
||
[tx_signed_raw_hex],
|
||
chain_str,
|
||
tx['from'],
|
||
tx['gasPrice'] * tx['gas'],
|
||
[tx_hash_hex],
|
||
queue=queue,
|
||
)
|
||
s.apply_async()
|
||
return tx_hash_hex
|
||
|
||
|
||
@celery_app.task(bind=True, throws=(web3.exceptions.TransactionNotFound,))
|
||
def sync_tx(self, tx_hash_hex, chain_str):
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
c = RpcClient(chain_spec)
|
||
|
||
tx = c.w3.eth.getTransaction(tx_hash_hex)
|
||
rcpt = None
|
||
try:
|
||
rcpt = c.w3.eth.getTransactionReceipt(tx_hash_hex)
|
||
except web3.exceptions.TransactionNotFound as e:
|
||
pass
|
||
|
||
if rcpt != None:
|
||
success = rcpt['status'] == 1
|
||
logg.debug('sync tx {} mined block {} success {}'.format(tx_hash_hex, rcpt['blockNumber'], success))
|
||
|
||
s = celery.signature(
|
||
'cic_eth.queue.tx.set_final_status',
|
||
[
|
||
tx_hash_hex,
|
||
rcpt['blockNumber'],
|
||
not success,
|
||
],
|
||
queue=queue,
|
||
)
|
||
else:
|
||
logg.debug('sync tx {} mempool'.format(tx_hash_hex))
|
||
|
||
s = celery.signature(
|
||
'cic_eth.queue.tx.set_sent_status',
|
||
[
|
||
tx_hash_hex,
|
||
],
|
||
queue=queue,
|
||
)
|
||
|
||
s.apply_async()
|
||
|
||
|
||
|
||
@celery_app.task(bind=True)
|
||
def resume_tx(self, txpending_hash_hex, chain_str):
|
||
"""Queue a suspended tranaction for (re)sending
|
||
|
||
:param txpending_hash_hex: Transaction hash
|
||
:type txpending_hash_hex: str, 0x-hex
|
||
:param chain_str: Chain spec, string representation
|
||
:type chain_str: str
|
||
:raises NotLocalTxError: Transaction does not exist in the local queue
|
||
:returns: Transaction hash
|
||
:rtype: str, 0x-hex
|
||
"""
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
|
||
session = SessionBase.create_session()
|
||
r = session.query(Otx.signed_tx).filter(Otx.tx_hash==txpending_hash_hex).first()
|
||
session.close()
|
||
if r == None:
|
||
raise NotLocalTxError(txpending_hash_hex)
|
||
|
||
tx_signed_raw_hex = r[0]
|
||
tx_signed_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
|
||
tx = unpack_signed_raw_tx(tx_signed_bytes, chain_spec.chain_id())
|
||
|
||
queue = self.request.delivery_info['routing_key']
|
||
|
||
s = create_check_gas_and_send_task(
|
||
[tx_signed_raw_hex],
|
||
chain_str,
|
||
tx['from'],
|
||
tx['gasPrice'] * tx['gas'],
|
||
[txpending_hash_hex],
|
||
queue=queue,
|
||
)
|
||
s.apply_async()
|
||
return txpending_hash_hex
|
||
|
||
|
||
@celery_app.task()
|
||
def otx_cache_parse_tx(
|
||
tx_hash_hex,
|
||
tx_signed_raw_hex,
|
||
chain_str,
|
||
):
|
||
"""Generates and commits transaction cache metadata for a gas refill transaction
|
||
|
||
:param tx_hash_hex: Transaction hash
|
||
:type tx_hash_hex: str, 0x-hex
|
||
:param tx_signed_raw_hex: Raw signed transaction
|
||
:type tx_signed_raw_hex: str, 0x-hex
|
||
:param chain_str: Chain spec string representation
|
||
:type chain_str: str
|
||
:returns: Transaction hash and id of cache element in storage backend, respectively
|
||
:rtype: tuple
|
||
"""
|
||
|
||
|
||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
||
c = RpcClient(chain_spec)
|
||
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
|
||
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
|
||
(txc, cache_id) = cache_gas_refill_data(tx_hash_hex, tx)
|
||
return txc
|
||
|
||
|
||
@celery_app.task()
|
||
def cache_gas_refill_data(
|
||
tx_hash_hex,
|
||
tx,
|
||
):
|
||
"""Helper function for otx_cache_parse_tx
|
||
|
||
:param tx_hash_hex: Transaction hash
|
||
:type tx_hash_hex: str, 0x-hex
|
||
:param tx: Signed raw transaction
|
||
:type tx: str, 0x-hex
|
||
:returns: Transaction hash and id of cache element in storage backend, respectively
|
||
:rtype: tuple
|
||
"""
|
||
tx_cache = TxCache(
|
||
tx_hash_hex,
|
||
tx['from'],
|
||
tx['to'],
|
||
zero_address,
|
||
zero_address,
|
||
tx['value'],
|
||
tx['value'],
|
||
)
|
||
|
||
session = SessionBase.create_session()
|
||
session.add(tx_cache)
|
||
session.commit()
|
||
cache_id = tx_cache.id
|
||
session.close()
|
||
return (tx_hash_hex, cache_id)
|