Merge branch 'lash/cic-eth-status-enum-bitfield' into 'master'

Translate StatusEnum to flags instead of number ranges

See merge request grassrootseconomics/cic-internal-integration!14
This commit is contained in:
Louis Holbrook 2021-02-13 17:01:48 +00:00
commit d872e78e39
16 changed files with 758 additions and 331 deletions

View File

@ -16,7 +16,10 @@ from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
is_alive,
)
from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError
from cic_eth.eth.rpc import RpcClient
@ -110,23 +113,31 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get()
if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
#if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
s = None
if in_place:
if not in_place:
raise NotImplementedError('resend as new not yet implemented')
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
chain_str,
None,
1.01,
],
queue=self.queue,
)
else:
raise NotImplementedError('resend as new not yet implemented')
s_manual = celery.signature(
'cic_eth.queue.tx.set_manual',
[
tx_hash_hex,
],
queue=self.queue,
)
s_manual.link(s)
if unlock:
s_gas = celery.signature(
@ -139,7 +150,7 @@ class AdminApi:
)
s.link(s_gas)
return s.apply_async()
return s_manual.apply_async()
def check_nonce(self, address):
s = celery.signature(

View File

@ -1,6 +1,26 @@
# standard imports
import enum
@enum.unique
class StatusBits(enum.IntEnum):
QUEUED = 0x01
IN_NETWORK = 0x08
DEFERRED = 0x10
GAS_ISSUES = 0x20
LOCAL_ERROR = 0x100
NODE_ERROR = 0x200
NETWORK_ERROR = 0x400
UNKNOWN_ERROR = 0x800
FINAL = 0x1000
OBSOLETE = 0x2000
MANUAL = 0x8000
@enum.unique
class StatusEnum(enum.IntEnum):
"""
@ -22,21 +42,27 @@ class StatusEnum(enum.IntEnum):
* SUCCESS: THe transaction was successfully mined. (Block number will be set)
"""
PENDING=-9
SENDFAIL=-8
RETRY=-7
READYSEND=-6
OBSOLETED=-2
WAITFORGAS=-1
SENT=0
FUBAR=1
CANCELLED=2
OVERRIDDEN=3
REJECTED=7
REVERTED=8
SUCCESS=9
PENDING = 0
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
READYSEND = StatusBits.QUEUED
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
WAITFORGAS = StatusBits.GAS_ISSUES
SENT = StatusBits.IN_NETWORK
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
@enum.unique
class LockEnum(enum.IntEnum):
"""
STICKY: When set, reset is not possible
@ -48,4 +74,40 @@ class LockEnum(enum.IntEnum):
CREATE=2
SEND=4
QUEUE=8
QUERY=16
ALL=int(0xfffffffffffffffe)
def status_str(v, bits_only=False):
s = ''
if not bits_only:
try:
s = StatusEnum(v).name
return s
except ValueError:
pass
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:
n = StatusBits(b).name
if len(s) > 0:
s += ','
s += n
if not bits_only:
s += '*'
return s
def all_errors():
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
return bool(v & all_errors())
def is_alive(v):
return bool(v & (StatusBits.FINAL | StatusBits.OBSOLETE) == 0)

View File

@ -1,9 +1,14 @@
# stanard imports
import logging
# third-party imports
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
logg = logging.getLogger()
Model = declarative_base(name='Model')
@ -21,7 +26,11 @@ class SessionBase(Model):
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod
@ -71,3 +80,23 @@ class SessionBase(Model):
"""
SessionBase.engine.dispose()
SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@ -8,7 +8,12 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports
from .base import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
is_error_status,
)
from cic_eth.db.error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx
@ -54,21 +59,24 @@ class Otx(SessionBase):
block = Column(Integer)
def __set_status(self, status, session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
def __set_status(self, status, session):
self.status |= status
session.add(self)
session.flush()
self.status = status
localsession.add(self)
localsession.flush()
if self.tracing:
self.__state_log(session=localsession)
def __reset_status(self, status, session):
status_edit = ~status & self.status
self.status &= status_edit
session.add(self)
session.flush()
if session==None:
localsession.commit()
localsession.close()
def __status_already_set(self, status):
r = bool(self.status & status)
if r:
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None):
@ -102,9 +110,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('WAITFORGAS cannot succeed final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.WAITFORGAS, session)
if self.__status_already_set(StatusBits.GAS_ISSUES):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def fubar(self, session=None):
@ -112,7 +134,22 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend.
"""
self.__set_status(StatusEnum.FUBAR, session)
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def reject(self, session=None):
@ -120,20 +157,66 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('REJECTED cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.REJECTED, session)
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def override(self, session=None):
def override(self, manual=False, session=None):
"""Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend.
"""
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('OVERRIDDEN cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.OVERRIDDEN, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def manual(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
@ -142,9 +225,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status != StatusEnum.SENT.value and self.status != StatusEnum.SENDFAIL.value:
raise TxStateChangeError('RETRY must follow SENT or SENDFAIL, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.RETRY, session)
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def readysend(self, session=None):
@ -154,9 +251,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status != StatusEnum.PENDING.value and self.status != StatusEnum.WAITFORGAS.value:
raise TxStateChangeError('READYSEND must follow PENDING or WAITFORGAS, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.READYSEND, session)
if self.__status_already_set(StatusBits.QUEUED):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sent(self, session=None):
@ -166,9 +277,22 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status > StatusEnum.SENT:
raise TxStateChangeError('SENT after {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SENT, session)
if self.__status_already_set(StatusBits.IN_NETWORK):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
logg.debug('<<< status {}'.format(status_str(self.status)))
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sendfail(self, session=None):
@ -178,9 +302,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.status not in [StatusEnum.PENDING, StatusEnum.SENT, StatusEnum.WAITFORGAS]:
raise TxStateChangeError('SENDFAIL must follow SENT or PENDING, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SENDFAIL, session)
if self.__status_already_set(StatusBits.NODE_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None):
@ -192,14 +330,25 @@ class Otx(SessionBase):
:type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NETWORK_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
if self.status != StatusEnum.SENT:
logg.warning('REVERTED should follow SENT, but had {}'.format(StatusEnum(self.status).name))
#if self.status != StatusEnum.PENDING and self.status != StatusEnum.OBSOLETED and self.status != StatusEnum.SENT:
#if self.status > StatusEnum.SENT:
# raise TxStateChangeError('REVERTED must follow OBSOLETED, PENDING or SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.REVERTED, session)
self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def cancel(self, confirmed=False, session=None):
@ -213,18 +362,36 @@ class Otx(SessionBase):
:type confirmed: bool
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed:
if self.status != StatusEnum.OBSOLETED:
logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
if not self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusEnum.CANCELLED, session)
elif self.status != StatusEnum.OBSOLETED:
if self.status > StatusEnum.SENT:
logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
else:
self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def success(self, block, session=None):
"""Marks that transaction was successfully mined.
@ -235,13 +402,24 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None:
self.block = block
if self.status != StatusEnum.SENT:
logg.error('SUCCESS should follow SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('SUCCESS must follow SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SUCCESS, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
@staticmethod
def get(status=0, limit=4096, status_exact=True):
@ -450,6 +628,3 @@ class OtxSync(SessionBase):
self.tx_height_session = 0
self.block_height_backlog = 0
self.tx_height_backlog = 0

View File

@ -13,7 +13,10 @@ from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
LockEnum,
StatusBits,
)
from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError
@ -399,9 +402,10 @@ def refill_gas(self, recipient_address, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash)
q = q.join(TxCache)
q = q.filter(Otx.status<=0)
q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.recipient==recipient_address)
c = q.count()
@ -495,7 +499,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('otx {} {}'.format(tx, otx.signed_tx))
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key']

View File

@ -6,6 +6,7 @@ import datetime
# third-party imports
import celery
from sqlalchemy import or_
from sqlalchemy import not_
from sqlalchemy import tuple_
from sqlalchemy import func
@ -16,8 +17,12 @@ from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock
from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
StatusEnum,
LockEnum,
StatusBits,
is_alive,
)
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError
@ -70,10 +75,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
for otx in q.all():
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))
if otx.status == StatusEnum.SENT:
otx.cancel(False, session=session)
elif otx.status != StatusEnum.OBSOLETED:
otx.override(session=session)
otx.cancel(confirmed=False, session=session)
session.commit()
session.close()
@ -167,6 +169,7 @@ def set_final_status(tx_hash, block=None, fail=False):
return tx_hash
@celery_app.task()
def set_cancel(tx_hash, manual=False):
"""Used to set the status when a transaction is cancelled.
@ -250,6 +253,33 @@ def set_fubar(tx_hash):
return tx_hash
@celery_app.task()
def set_manual(tx_hash):
"""Used to set the status when queue is manually changed
Will set the state to MANUAL
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.manual(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task()
def set_ready(tx_hash):
"""Used to mark a transaction as ready to be sent to network
@ -265,14 +295,11 @@ def set_ready(tx_hash):
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
if o.status == StatusEnum.WAITFORGAS or o.status == StatusEnum.PENDING:
if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING:
o.readysend(session=session)
else:
o.retry(session=session)
logg.debug('ot otx otx {} {}'.format(tx_hash, o))
session.add(o)
session.commit()
session.close()
@ -304,6 +331,7 @@ def set_waitforgas(tx_hash):
return tx_hash
@celery_app.task()
def get_state_log(tx_hash):
@ -483,13 +511,14 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
q = session.query(Otx)
if status != None:
if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
#if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
raise ValueError('not a valid paused tx value: {}'.format(status))
q = q.filter(Otx.status==status)
q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.join(TxCache)
else:
q = q.filter(Otx.status>StatusEnum.PENDING)
q = q.filter(Otx.status<StatusEnum.SENT)
q = q.filter(Otx.status>StatusEnum.PENDING.value)
q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0))
if sender != None:
q = q.filter(TxCache.sender==sender)
@ -508,7 +537,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
return txs
def get_status_tx(status, before=None, limit=0):
def get_status_tx(status, before=None, exact=False, limit=0):
"""Retrieve transaction with a specific queue status.
:param status: Status to match transactions with
@ -525,7 +554,10 @@ def get_status_tx(status, before=None, limit=0):
q = session.query(Otx)
q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before)
q = q.filter(Otx.status==status)
if exact:
q = q.filter(Otx.status==status.value)
else:
q = q.filter(Otx.status.op('&')(status.value)==status.value)
i = 0
for o in q.all():
if limit > 0 and i == limit:
@ -565,9 +597,12 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if status >= StatusEnum.SENT:
raise ValueError('not a valid non-final tx value: {}'.format(s))
if not is_alive(status):
raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value)
else:
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value)
if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient)

View File

@ -2,7 +2,7 @@ web3==5.12.2
celery==4.4.7
crypto-dev-signer~=0.4.13rc2
confini~=0.3.6b1
cic-registry~=0.5.3a10
cic-registry~=0.5.3a12
cic-bancor~=0.0.6
redis==3.5.3
alembic==1.4.2

View File

@ -10,7 +10,11 @@ import web3
# local imports
from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
)
from cic_eth.error import InitializationError
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.tx import cache_gas_refill_data
@ -64,7 +68,11 @@ def test_resend_inplace(
api = AdminApi(c, queue=None)
t = api.resend(tx_dict['hash'], chain_str, unlock=True)
tx_hash_new_hex = t.get()
t.get()
i = 0
tx_hash_new_hex = None
for r in t.collect():
tx_hash_new_hex = r[1]
assert t.successful()
tx_raw_new = get_tx(tx_hash_new_hex)
@ -74,142 +82,144 @@ def test_resend_inplace(
assert tx_dict_new['gasPrice'] > gas_price_before
tx_dict_after = get_tx(tx_dict['hash'])
assert tx_dict_after['status'] == StatusEnum.OVERRIDDEN
logg.debug('logggg {}'.format(status_str(tx_dict_after['status'])))
assert tx_dict_after['status'] & StatusBits.MANUAL
def test_check_fix_nonce(
default_chain_spec,
init_database,
init_eth_account_roles,
init_w3,
eth_empty_accounts,
celery_session_worker,
):
chain_str = str(default_chain_spec)
sigs = []
for i in range(5):
s = celery.signature(
'cic_eth.eth.tx.refill_gas',
[
eth_empty_accounts[i],
chain_str,
],
queue=None,
)
sigs.append(s)
t = celery.group(sigs)()
txs = t.get()
assert t.successful()
tx_hash = web3.Web3.keccak(hexstr=txs[2])
c = RpcClient(default_chain_spec)
api = AdminApi(c, queue=None)
address = init_eth_account_roles['eth_account_gas_provider']
nonce_spec = api.check_nonce(address)
assert nonce_spec['nonce']['network'] == 0
assert nonce_spec['nonce']['queue'] == 4
assert nonce_spec['nonce']['blocking'] == None
s_set = celery.signature(
'cic_eth.queue.tx.set_rejected',
[
tx_hash.hex(),
],
queue=None,
)
t = s_set.apply_async()
t.get()
t.collect()
assert t.successful()
nonce_spec = api.check_nonce(address)
assert nonce_spec['nonce']['blocking'] == 2
assert nonce_spec['tx']['blocking'] == tx_hash.hex()
t = api.fix_nonce(address, nonce_spec['nonce']['blocking'])
t.get()
t.collect()
assert t.successful()
for tx in txs[3:]:
tx_hash = web3.Web3.keccak(hexstr=tx)
tx_dict = get_tx(tx_hash.hex())
assert tx_dict['status'] == StatusEnum.OVERRIDDEN
def test_tag_account(
init_database,
eth_empty_accounts,
init_rpc,
):
api = AdminApi(init_rpc)
api.tag_account('foo', eth_empty_accounts[0])
api.tag_account('bar', eth_empty_accounts[1])
api.tag_account('bar', eth_empty_accounts[2])
assert AccountRole.get_address('foo') == eth_empty_accounts[0]
assert AccountRole.get_address('bar') == eth_empty_accounts[2]
def test_ready(
init_database,
eth_empty_accounts,
init_rpc,
w3,
):
api = AdminApi(init_rpc)
with pytest.raises(InitializationError):
api.ready()
bogus_account = os.urandom(20)
bogus_account_hex = '0x' + bogus_account.hex()
api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex))
with pytest.raises(KeyError):
api.ready()
api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0])
api.ready()
def test_tx(
default_chain_spec,
cic_registry,
init_database,
init_rpc,
init_w3,
celery_session_worker,
):
tx = {
'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1],
'nonce': 42,
'gas': 21000,
'gasPrice': 1000000,
'value': 128,
'chainId': default_chain_spec.chain_id(),
'data': '',
}
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
queue_create(
tx['nonce'],
tx['from'],
tx_hash_hex,
tx_signed_raw_hex,
str(default_chain_spec),
)
tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_signed_raw_hex[2:]), default_chain_spec.chain_id())
cache_gas_refill_data(tx_hash_hex, tx_recovered)
api = AdminApi(init_rpc, queue=None)
tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex)
#def test_check_fix_nonce(
# default_chain_spec,
# init_database,
# init_eth_account_roles,
# init_w3,
# eth_empty_accounts,
# celery_session_worker,
# ):
#
# chain_str = str(default_chain_spec)
#
# sigs = []
# for i in range(5):
# s = celery.signature(
# 'cic_eth.eth.tx.refill_gas',
# [
# eth_empty_accounts[i],
# chain_str,
# ],
# queue=None,
# )
# sigs.append(s)
#
# t = celery.group(sigs)()
# txs = t.get()
# assert t.successful()
#
# tx_hash = web3.Web3.keccak(hexstr=txs[2])
# c = RpcClient(default_chain_spec)
# api = AdminApi(c, queue=None)
# address = init_eth_account_roles['eth_account_gas_provider']
# nonce_spec = api.check_nonce(address)
# assert nonce_spec['nonce']['network'] == 0
# assert nonce_spec['nonce']['queue'] == 4
# assert nonce_spec['nonce']['blocking'] == None
#
# s_set = celery.signature(
# 'cic_eth.queue.tx.set_rejected',
# [
# tx_hash.hex(),
# ],
# queue=None,
# )
# t = s_set.apply_async()
# t.get()
# t.collect()
# assert t.successful()
#
#
# nonce_spec = api.check_nonce(address)
# assert nonce_spec['nonce']['blocking'] == 2
# assert nonce_spec['tx']['blocking'] == tx_hash.hex()
#
# t = api.fix_nonce(address, nonce_spec['nonce']['blocking'])
# t.get()
# t.collect()
# assert t.successful()
#
# for tx in txs[3:]:
# tx_hash = web3.Web3.keccak(hexstr=tx)
# tx_dict = get_tx(tx_hash.hex())
# assert tx_dict['status'] == StatusEnum.OVERRIDDEN
#
#
#def test_tag_account(
# init_database,
# eth_empty_accounts,
# init_rpc,
# ):
#
# api = AdminApi(init_rpc)
#
# api.tag_account('foo', eth_empty_accounts[0])
# api.tag_account('bar', eth_empty_accounts[1])
# api.tag_account('bar', eth_empty_accounts[2])
#
# assert AccountRole.get_address('foo') == eth_empty_accounts[0]
# assert AccountRole.get_address('bar') == eth_empty_accounts[2]
#
#
#def test_ready(
# init_database,
# eth_empty_accounts,
# init_rpc,
# w3,
# ):
#
# api = AdminApi(init_rpc)
#
# with pytest.raises(InitializationError):
# api.ready()
#
# bogus_account = os.urandom(20)
# bogus_account_hex = '0x' + bogus_account.hex()
#
# api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex))
# with pytest.raises(KeyError):
# api.ready()
#
# api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0])
# api.ready()
#
#
#def test_tx(
# default_chain_spec,
# cic_registry,
# init_database,
# init_rpc,
# init_w3,
# celery_session_worker,
# ):
#
# tx = {
# 'from': init_w3.eth.accounts[0],
# 'to': init_w3.eth.accounts[1],
# 'nonce': 42,
# 'gas': 21000,
# 'gasPrice': 1000000,
# 'value': 128,
# 'chainId': default_chain_spec.chain_id(),
# 'data': '',
# }
#
# (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
# queue_create(
# tx['nonce'],
# tx['from'],
# tx_hash_hex,
# tx_signed_raw_hex,
# str(default_chain_spec),
# )
# tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_signed_raw_hex[2:]), default_chain_spec.chain_id())
# cache_gas_refill_data(tx_hash_hex, tx_recovered)
#
# api = AdminApi(init_rpc, queue=None)
# tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex)

View File

@ -10,7 +10,10 @@ from cic_registry import zero_address
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
logg = logging.getLogger()
@ -169,6 +172,9 @@ def test_status_fubar(
)
t = s.apply_async()
t.get()
for n in t.collect():
pass
assert t.successful()
init_database.refresh(otx)
assert otx.status == StatusEnum.FUBAR
otx = Otx.load(tx_hash)
assert otx.status & StatusBits.UNKNOWN_ERROR

View File

@ -8,7 +8,11 @@ import celery
# local imports
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_error_status,
)
from cic_eth.eth.task import sign_and_register_tx
logg = logging.getLogger()
@ -101,7 +105,7 @@ def test_states_failed(
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit()
s = celery.signature(
@ -121,5 +125,9 @@ def test_states_failed(
pass
assert t.successful()
init_database.commit()
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
assert otx.status == StatusEnum.RETRY.value
assert otx.status & StatusEnum.RETRY == StatusEnum.RETRY
#assert otx.status & StatusBits.QUEUED
assert is_error_status(otx.status)

View File

@ -24,7 +24,6 @@ class Response:
status = 200
@pytest.mark.skip()
def test_callback_http(
celery_session_worker,
mocker,
@ -43,7 +42,6 @@ def test_callback_http(
t.get()
@pytest.mark.skip()
def test_callback_tcp(
celery_session_worker,
):

View File

@ -0,0 +1,20 @@
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
)
def test_status_str():
# String representation for a status in StatusEnum
s = status_str(StatusEnum.REVERTED)
assert s == 'REVERTED'
# String representation for a status not in StatusEnum
s = status_str(StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR)
assert s == 'LOCAL_ERROR,NODE_ERROR*'
# String representation for a status in StatusEnum, but bits only representation bit set
s = status_str(StatusEnum.REVERTED, bits_only=True)
assert s == 'IN_NETWORK,NETWORK_ERROR,FINAL'

View File

@ -9,7 +9,11 @@ import pytest
from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
)
logg = logging.getLogger()
@ -70,8 +74,16 @@ def test_state_log(
otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database)
otx.waitforgas(session=init_database)
init_database.commit()
otx.readysend(session=init_database)
init_database.commit()
otx.sent(session=init_database)
init_database.commit()
otx.success(1024, session=init_database)
init_database.commit()
q = init_database.query(OtxStateLog)
q = q.filter(OtxStateLog.otx_id==otx.id)
@ -80,5 +92,6 @@ def test_state_log(
assert logs[0].status == StatusEnum.PENDING
assert logs[1].status == StatusEnum.WAITFORGAS
assert logs[2].status == StatusEnum.SENT
assert logs[3].status == StatusEnum.SUCCESS
assert logs[2].status & StatusBits.QUEUED
assert logs[3].status & StatusBits.IN_NETWORK
assert not is_alive(logs[4].status)

View File

@ -1,55 +0,0 @@
# standard imports
import logging
# third-party imports
import pytest
# local imports
from cic_eth.db import Otx
from cic_eth.db.error import TxStateChangeError
logg = logging.getLogger()
# Check that invalid transitions throw exceptions
# sent
def test_db_queue_states(
init_database,
):
session = init_database
# these values are completely arbitary
tx_hash = '0xF182DFA3AD48723E7E222FE7B4C2C44C23CD4D7FF413E8999DFA15ECE53F'
address = '0x38C5559D6EDDDA1F705D3AB1A664CA1B397EB119'
signed_tx = '0xA5866A5383249AE843546BDA46235A1CA1614F538FB486140693C2EF1956FC53213F6AEF0F99F44D7103871AF3A12B126DCF9BFB7AF11143FAB3ECE2B452EE35D1320C4C7C6F999C8DF4EB09E729715B573F6672ED852547F552C4AE99D17DCD14C810'
o = Otx(
nonce=42,
address=address[2:],
tx_hash=tx_hash[2:],
signed_tx=signed_tx[2:],
)
session.add(o)
session.commit()
o.sent(session=session)
session.commit()
# send after sent is ok
o.sent(session=session)
session.commit()
o.sendfail(session=session)
session.commit()
with pytest.raises(TxStateChangeError):
o.sendfail(session=session)
o.sent(session=session)
session.commit()
o.minefail(1234, session=session)
session.commit()
with pytest.raises(TxStateChangeError):
o.sent(session=session)

View File

@ -0,0 +1,97 @@
# standard imports
import os
# third-party imports
import pytest
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
)
@pytest.fixture(scope='function')
def otx(
init_database,
):
bogus_hash = '0x' + os.urandom(32).hex()
bogus_address = '0x' + os.urandom(20).hex()
bogus_tx_raw = '0x' + os.urandom(128).hex()
return Otx(0, bogus_address, bogus_hash, bogus_tx_raw)
def test_status_chain_gas(
init_database,
otx,
):
otx.waitforgas(init_database)
otx.readysend(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_straight_success(
init_database,
otx,
):
otx.readysend(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_straight_revert(
init_database,
otx,
):
otx.readysend(init_database)
otx.sent(init_database)
otx.minefail(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror(
init_database,
otx,
):
otx.readysend(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror_multiple(
init_database,
otx,
):
otx.readysend(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror(
init_database,
otx,
):
otx.readysend(init_database)
otx.reject(init_database)
assert not is_alive(otx.status)

View File

@ -16,8 +16,14 @@ from cic_eth.db.models.otx import OtxSync
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock
from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import StatusEnum
from cic_eth.db.enum import LockEnum
from cic_eth.db.enum import (
StatusEnum,
LockEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import set_final_status
from cic_eth.queue.tx import set_sent_status
@ -63,13 +69,14 @@ def test_finalize(
set_sent_status(tx_hash.hex())
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status == StatusEnum.OBSOLETED
assert otx.status & StatusBits.OBSOLETE
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status == StatusEnum.OBSOLETED
assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status == StatusEnum.OBSOLETED
assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status == StatusEnum.PENDING
@ -82,19 +89,22 @@ def test_finalize(
set_final_status(tx_hashes[3], 1024)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status == StatusEnum.CANCELLED
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status == StatusEnum.CANCELLED
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status == StatusEnum.CANCELLED
assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status == StatusEnum.SUCCESS
assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first()
assert otx.status == StatusEnum.SENT
assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
def test_expired(
@ -404,7 +414,7 @@ def test_obsoletion(
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.OBSOLETED)
q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value)
z = 0
for o in q.all():
z += o.nonce
@ -416,13 +426,13 @@ def test_obsoletion(
session = SessionBase.create_session()
q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.OBSOLETED)
q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value)
zo = 0
for o in q.all():
zo += o.nonce
q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.CANCELLED)
q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value)
zc = 0
for o in q.all():
zc += o.nonce
@ -450,16 +460,20 @@ def test_retry(
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
assert otx.status == StatusEnum.RETRY
assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value
assert is_error_status(otx.status)
set_sent_status(tx_hash, False)
set_ready(tx_hash)
init_database.commit()
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first()
assert otx.status == StatusEnum.RETRY
assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value
assert not is_error_status(otx.status)
def test_get_account_tx(