cic-internal-integration/apps/cic-eth/cic_eth/queue/tx.py

95 lines
2.9 KiB
Python
Raw Permalink Normal View History

2021-02-01 18:12:51 +01:00
# standard imports
import logging
import time
import datetime
# external imports
2021-02-01 18:12:51 +01:00
import celery
2021-04-04 14:40:59 +02:00
from chainqueue.db.models.otx import Otx
from chainqueue.db.models.otx import OtxStateLog
from chainqueue.db.models.tx import TxCache
2021-02-17 11:04:21 +01:00
from hexathon import strip_0x
2021-02-01 18:12:51 +01:00
from sqlalchemy import or_
from sqlalchemy import not_
2021-02-01 18:12:51 +01:00
from sqlalchemy import tuple_
from sqlalchemy import func
2021-04-04 14:40:59 +02:00
from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
2021-04-04 14:40:59 +02:00
import chainqueue.state
from chainqueue.db.enum import (
StatusEnum,
StatusBits,
is_alive,
2021-02-17 11:04:21 +01:00
dead,
)
2021-04-04 14:40:59 +02:00
from chainqueue.tx import create
from chainqueue.error import NotLocalTxError
from chainqueue.db.enum import status_str
# local imports
from cic_eth.db.models.lock import Lock
from cic_eth.db import SessionBase
from cic_eth.db.enum import LockEnum
2021-03-01 21:15:17 +01:00
from cic_eth.task import CriticalSQLAlchemyTask
2021-02-01 18:12:51 +01:00
from cic_eth.error import LockedError
celery_app = celery.current_app
logg = logging.getLogger()
2021-04-04 14:40:59 +02:00
def queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, session=None):
session = SessionBase.bind_session(session)
2021-04-04 14:40:59 +02:00
lock = Lock.check_aggregate(str(chain_spec), LockEnum.QUEUE, holder_address, session=session)
2021-02-01 18:12:51 +01:00
if lock > 0:
SessionBase.release_session(session)
2021-02-01 18:12:51 +01:00
raise LockedError(lock)
2021-04-04 14:40:59 +02:00
tx_hash = create(chain_spec, nonce, holder_address, tx_hash, signed_tx, chain_spec, session=session)
SessionBase.release_session(session)
2021-04-04 14:40:59 +02:00
2021-02-01 18:12:51 +01:00
return tx_hash
def register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=None, session=None):
"""Signs the provided transaction, and adds it to the transaction queue cache (with status PENDING).
:param tx: Standard ethereum transaction data
:type tx: dict
:param chain_spec: Chain spec of transaction to add to queue
:type chain_spec: chainlib.chain.ChainSpec
:param queue: Task queue
:type queue: str
:param cache_task: Cache task to call with signed transaction. If None, no task will be called.
:type cache_task: str
:raises: sqlalchemy.exc.DatabaseError
:returns: Tuple; Transaction hash, signed raw transaction data
:rtype: tuple
"""
logg.debug('adding queue tx {}:{} -> {}'.format(chain_spec, tx_hash_hex, tx_signed_raw_hex))
tx_signed_raw = bytes.fromhex(strip_0x(tx_signed_raw_hex))
2021-04-04 14:40:59 +02:00
tx = unpack(tx_signed_raw, chain_spec)
2021-04-04 14:40:59 +02:00
tx_hash = queue_create(
chain_spec,
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
session=session,
)
if cache_task != None:
logg.debug('adding cache task {} tx {}'.format(cache_task, tx_hash_hex))
s_cache = celery.signature(
cache_task,
[
tx_hash_hex,
tx_signed_raw_hex,
chain_spec.asdict(),
],
queue=queue,
)
s_cache.apply_async()