Translate StatusEnum to flags instead of number ranges

This commit is contained in:
Louis Holbrook 2021-02-13 17:01:48 +00:00
parent d042ce0dcd
commit 14f29c4c32
16 changed files with 758 additions and 331 deletions

View File

@ -16,7 +16,10 @@ from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.nonce import Nonce from cic_eth.db.models.nonce import Nonce
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
is_alive,
)
from cic_eth.error import InitializationError from cic_eth.error import InitializationError
from cic_eth.db.error import TxStateChangeError from cic_eth.db.error import TxStateChangeError
from cic_eth.eth.rpc import RpcClient from cic_eth.eth.rpc import RpcClient
@ -110,23 +113,31 @@ class AdminApi:
# TODO: This check should most likely be in resend task itself # TODO: This check should most likely be in resend task itself
tx_dict = s_get_tx_cache.apply_async().get() tx_dict = s_get_tx_cache.apply_async().get()
if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]: #if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]:
if not is_alive(getattr(StatusEnum, tx_dict['status']).value):
raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex)) raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex))
s = None if not in_place:
if in_place: raise NotImplementedError('resend as new not yet implemented')
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.tx.resend_with_higher_gas',
[ [
tx_hash_hex,
chain_str, chain_str,
None, None,
1.01, 1.01,
], ],
queue=self.queue, queue=self.queue,
) )
else:
raise NotImplementedError('resend as new not yet implemented') s_manual = celery.signature(
'cic_eth.queue.tx.set_manual',
[
tx_hash_hex,
],
queue=self.queue,
)
s_manual.link(s)
if unlock: if unlock:
s_gas = celery.signature( s_gas = celery.signature(
@ -139,7 +150,7 @@ class AdminApi:
) )
s.link(s_gas) s.link(s_gas)
return s.apply_async() return s_manual.apply_async()
def check_nonce(self, address): def check_nonce(self, address):
s = celery.signature( s = celery.signature(

View File

@ -1,6 +1,26 @@
# standard imports # standard imports
import enum import enum
@enum.unique
class StatusBits(enum.IntEnum):
QUEUED = 0x01
IN_NETWORK = 0x08
DEFERRED = 0x10
GAS_ISSUES = 0x20
LOCAL_ERROR = 0x100
NODE_ERROR = 0x200
NETWORK_ERROR = 0x400
UNKNOWN_ERROR = 0x800
FINAL = 0x1000
OBSOLETE = 0x2000
MANUAL = 0x8000
@enum.unique
class StatusEnum(enum.IntEnum): class StatusEnum(enum.IntEnum):
""" """
@ -22,21 +42,27 @@ class StatusEnum(enum.IntEnum):
* SUCCESS: THe transaction was successfully mined. (Block number will be set) * SUCCESS: THe transaction was successfully mined. (Block number will be set)
""" """
PENDING=-9 PENDING = 0
SENDFAIL=-8
RETRY=-7 SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
READYSEND=-6 RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
OBSOLETED=-2 READYSEND = StatusBits.QUEUED
WAITFORGAS=-1
SENT=0 OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
FUBAR=1
CANCELLED=2 WAITFORGAS = StatusBits.GAS_ISSUES
OVERRIDDEN=3
REJECTED=7 SENT = StatusBits.IN_NETWORK
REVERTED=8 FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
SUCCESS=9 CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
@enum.unique
class LockEnum(enum.IntEnum): class LockEnum(enum.IntEnum):
""" """
STICKY: When set, reset is not possible STICKY: When set, reset is not possible
@ -48,4 +74,40 @@ class LockEnum(enum.IntEnum):
CREATE=2 CREATE=2
SEND=4 SEND=4
QUEUE=8 QUEUE=8
QUERY=16
ALL=int(0xfffffffffffffffe) ALL=int(0xfffffffffffffffe)
def status_str(v, bits_only=False):
s = ''
if not bits_only:
try:
s = StatusEnum(v).name
return s
except ValueError:
pass
for i in range(16):
b = (1 << i)
if (b & 0xffff) & v:
n = StatusBits(b).name
if len(s) > 0:
s += ','
s += n
if not bits_only:
s += '*'
return s
def all_errors():
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
return bool(v & all_errors())
def is_alive(v):
return bool(v & (StatusBits.FINAL | StatusBits.OBSOLETE) == 0)

View File

@ -1,9 +1,14 @@
# stanard imports
import logging
# third-party imports # third-party imports
from sqlalchemy import Column, Integer from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
logg = logging.getLogger()
Model = declarative_base(name='Model') Model = declarative_base(name='Model')
@ -21,7 +26,11 @@ class SessionBase(Model):
transactional = True transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code""" """Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True poolable = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code""" """Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
@staticmethod @staticmethod
@ -71,3 +80,23 @@ class SessionBase(Model):
""" """
SessionBase.engine.dispose() SessionBase.engine.dispose()
SessionBase.engine = None SessionBase.engine = None
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('destroying session {}'.format(session_key))
session.commit()
session.close()

View File

@ -8,7 +8,12 @@ from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
# local imports # local imports
from .base import SessionBase from .base import SessionBase
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
is_error_status,
)
from cic_eth.db.error import TxStateChangeError from cic_eth.db.error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx #from cic_eth.eth.util import address_hex_from_signed_tx
@ -54,21 +59,24 @@ class Otx(SessionBase):
block = Column(Integer) block = Column(Integer)
def __set_status(self, status, session=None): def __set_status(self, status, session):
localsession = session self.status |= status
if localsession == None: session.add(self)
localsession = SessionBase.create_session() session.flush()
self.status = status
localsession.add(self)
localsession.flush()
if self.tracing: def __reset_status(self, status, session):
self.__state_log(session=localsession) status_edit = ~status & self.status
self.status &= status_edit
session.add(self)
session.flush()
if session==None:
localsession.commit() def __status_already_set(self, status):
localsession.close() r = bool(self.status & status)
if r:
logg.warning('status bit {} already set on {}'.format(status.name, self.tx_hash))
return r
def set_block(self, block, session=None): def set_block(self, block, session=None):
@ -102,9 +110,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.status >= StatusEnum.SENT.value: if self.__status_already_set(StatusBits.GAS_ISSUES):
raise TxStateChangeError('WAITFORGAS cannot succeed final state, had {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.WAITFORGAS, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.GAS_ISSUES, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.DEFERRED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def fubar(self, session=None): def fubar(self, session=None):
@ -112,7 +134,22 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend. Only manipulates object, does not transaction or commit to backend.
""" """
self.__set_status(StatusEnum.FUBAR, session) if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('FUBAR cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def reject(self, session=None): def reject(self, session=None):
@ -120,20 +157,66 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend. Only manipulates object, does not transaction or commit to backend.
""" """
if self.status >= StatusEnum.SENT.value: if self.__status_already_set(StatusBits.NODE_ERROR):
raise TxStateChangeError('REJECTED cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.REJECTED, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REJECTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REJECTED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('REJECTED cannot be set on an entry with an error state already set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.NODE_ERROR | StatusBits.FINAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def override(self, session=None): def override(self, manual=False, session=None):
"""Marks transaction as manually overridden. """Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend. Only manipulates object, does not transaction or commit to backend.
""" """
if self.status >= StatusEnum.SENT.value:
raise TxStateChangeError('OVERRIDDEN cannot succeed SENT or final state, had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.OVERRIDDEN, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status_str(self.status)))
if self.status & StatusBits.OBSOLETE:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.OBSOLETE, session)
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.IN_NETWORK, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def manual(self, session=None):
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.MANUAL, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def retry(self, session=None): def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding. """Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
@ -142,9 +225,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.status != StatusEnum.SENT.value and self.status != StatusEnum.SENDFAIL.value: if self.__status_already_set(StatusBits.QUEUED):
raise TxStateChangeError('RETRY must follow SENT or SENDFAIL, but had {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.RETRY, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('RETRY cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not is_error_status(self.status) and not StatusBits.IN_NETWORK & self.status > 0:
raise TxStateChangeError('RETRY cannot be set on an entry that has no error ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def readysend(self, session=None): def readysend(self, session=None):
@ -154,9 +251,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.status != StatusEnum.PENDING.value and self.status != StatusEnum.WAITFORGAS.value: if self.__status_already_set(StatusBits.QUEUED):
raise TxStateChangeError('READYSEND must follow PENDING or WAITFORGAS, but had {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.READYSEND, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('READYSEND cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('READYSEND cannot be set on an errored state ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.QUEUED, session)
self.__reset_status(StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sent(self, session=None): def sent(self, session=None):
@ -166,9 +277,22 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.status > StatusEnum.SENT: if self.__status_already_set(StatusBits.IN_NETWORK):
raise TxStateChangeError('SENT after {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.SENT, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENT cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
logg.debug('<<< status {}'.format(status_str(self.status)))
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def sendfail(self, session=None): def sendfail(self, session=None):
@ -178,9 +302,23 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.status not in [StatusEnum.PENDING, StatusEnum.SENT, StatusEnum.WAITFORGAS]: if self.__status_already_set(StatusBits.NODE_ERROR):
raise TxStateChangeError('SENDFAIL must follow SENT or PENDING, but had {}'.format(StatusEnum(self.status).name)) return
self.__set_status(StatusEnum.SENDFAIL, session)
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SENDFAIL cannot be set on an entry with IN_NETWORK state set ({})'.format(status_str(self.status)))
self.__set_status(StatusBits.LOCAL_ERROR | StatusBits.DEFERRED, session)
self.__reset_status(StatusBits.QUEUED | StatusBits.GAS_ISSUES, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def minefail(self, block, session=None): def minefail(self, block, session=None):
@ -192,14 +330,25 @@ class Otx(SessionBase):
:type block: number :type block: number
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
if self.__status_already_set(StatusBits.NETWORK_ERROR):
return
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('REVERTED cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('REVERTED cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if block != None: if block != None:
self.block = block self.block = block
if self.status != StatusEnum.SENT:
logg.warning('REVERTED should follow SENT, but had {}'.format(StatusEnum(self.status).name)) self.__set_status(StatusBits.NETWORK_ERROR | StatusBits.FINAL, session)
#if self.status != StatusEnum.PENDING and self.status != StatusEnum.OBSOLETED and self.status != StatusEnum.SENT:
#if self.status > StatusEnum.SENT: if self.tracing:
# raise TxStateChangeError('REVERTED must follow OBSOLETED, PENDING or SENT, but had {}'.format(StatusEnum(self.status).name)) self.__state_log(session=session)
self.__set_status(StatusEnum.REVERTED, session)
SessionBase.release_session(session)
def cancel(self, confirmed=False, session=None): def cancel(self, confirmed=False, session=None):
@ -213,18 +362,36 @@ class Otx(SessionBase):
:type confirmed: bool :type confirmed: bool
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('CANCEL cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if confirmed: if confirmed:
if self.status != StatusEnum.OBSOLETED: if not self.status & StatusBits.OBSOLETE:
logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name)) raise TxStateChangeError('CANCEL can only be set on an entry marked OBSOLETE ({})'.format(status_str(self.status)))
#raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.CANCELLED, session) self.__set_status(StatusEnum.CANCELLED, session)
elif self.status != StatusEnum.OBSOLETED: else:
if self.status > StatusEnum.SENT:
logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.OBSOLETED, session) self.__set_status(StatusEnum.OBSOLETED, session)
# if confirmed:
# if self.status != StatusEnum.OBSOLETED:
# logg.warning('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('CANCELLED must follow OBSOLETED, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.CANCELLED, session)
# elif self.status != StatusEnum.OBSOLETED:
# if self.status > StatusEnum.SENT:
# logg.warning('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# #raise TxStateChangeError('OBSOLETED must follow PENDING, SENDFAIL or SENT, but had {}'.format(StatusEnum(self.status).name))
# self.__set_status(StatusEnum.OBSOLETED, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
def success(self, block, session=None): def success(self, block, session=None):
"""Marks that transaction was successfully mined. """Marks that transaction was successfully mined.
@ -235,13 +402,24 @@ class Otx(SessionBase):
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist. :raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
""" """
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
raise TxStateChangeError('SUCCESS cannot be set on an entry with FINAL state set ({})'.format(status_str(self.status)))
if not self.status & StatusBits.IN_NETWORK:
raise TxStateChangeError('SUCCESS cannot be set on an entry without IN_NETWORK state set ({})'.format(status_str(self.status)))
if is_error_status(self.status):
raise TxStateChangeError('SUCCESS cannot be set on an entry with error state set ({})'.format(status_str(self.status)))
if block != None: if block != None:
self.block = block self.block = block
if self.status != StatusEnum.SENT:
logg.error('SUCCESS should follow SENT, but had {}'.format(StatusEnum(self.status).name))
#raise TxStateChangeError('SUCCESS must follow SENT, but had {}'.format(StatusEnum(self.status).name))
self.__set_status(StatusEnum.SUCCESS, session) self.__set_status(StatusEnum.SUCCESS, session)
if self.tracing:
self.__state_log(session=session)
SessionBase.release_session(session)
@staticmethod @staticmethod
def get(status=0, limit=4096, status_exact=True): def get(status=0, limit=4096, status_exact=True):
@ -450,6 +628,3 @@ class OtxSync(SessionBase):
self.tx_height_session = 0 self.tx_height_session = 0
self.block_height_backlog = 0 self.block_height_backlog = 0
self.tx_height_backlog = 0 self.tx_height_backlog = 0

View File

@ -13,7 +13,10 @@ from .rpc import RpcClient
from cic_eth.db import Otx, SessionBase from cic_eth.db import Otx, SessionBase
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import (
LockEnum,
StatusBits,
)
from cic_eth.error import PermanentTxError from cic_eth.error import PermanentTxError
from cic_eth.error import TemporaryTxError from cic_eth.error import TemporaryTxError
from cic_eth.error import NotLocalTxError from cic_eth.error import NotLocalTxError
@ -399,9 +402,10 @@ def refill_gas(self, recipient_address, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_chain_str(chain_str)
session = SessionBase.create_session() session = SessionBase.create_session()
status_filter = StatusBits.FINAL | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
q = session.query(Otx.tx_hash) q = session.query(Otx.tx_hash)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(Otx.status<=0) q = q.filter(Otx.status.op('&')(StatusBits.FINAL.value)==0)
q = q.filter(TxCache.from_value!='0x00') q = q.filter(TxCache.from_value!='0x00')
q = q.filter(TxCache.recipient==recipient_address) q = q.filter(TxCache.recipient==recipient_address)
c = q.count() c = q.count()
@ -495,7 +499,7 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('otx {} {}'.format(tx, otx.signed_tx)) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info['routing_key']

View File

@ -6,6 +6,7 @@ import datetime
# third-party imports # third-party imports
import celery import celery
from sqlalchemy import or_ from sqlalchemy import or_
from sqlalchemy import not_
from sqlalchemy import tuple_ from sqlalchemy import tuple_
from sqlalchemy import func from sqlalchemy import func
@ -16,8 +17,12 @@ from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
from cic_eth.db.enum import LockEnum StatusEnum,
LockEnum,
StatusBits,
is_alive,
)
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError from cic_eth.error import NotLocalTxError
from cic_eth.error import LockedError from cic_eth.error import LockedError
@ -70,10 +75,7 @@ def create(nonce, holder_address, tx_hash, signed_tx, chain_str, obsolete_predec
for otx in q.all(): for otx in q.all():
logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash)) logg.info('otx {} obsoleted by {}'.format(otx.tx_hash, tx_hash))
if otx.status == StatusEnum.SENT: otx.cancel(confirmed=False, session=session)
otx.cancel(False, session=session)
elif otx.status != StatusEnum.OBSOLETED:
otx.override(session=session)
session.commit() session.commit()
session.close() session.close()
@ -167,6 +169,7 @@ def set_final_status(tx_hash, block=None, fail=False):
return tx_hash return tx_hash
@celery_app.task() @celery_app.task()
def set_cancel(tx_hash, manual=False): def set_cancel(tx_hash, manual=False):
"""Used to set the status when a transaction is cancelled. """Used to set the status when a transaction is cancelled.
@ -250,6 +253,33 @@ def set_fubar(tx_hash):
return tx_hash return tx_hash
@celery_app.task()
def set_manual(tx_hash):
"""Used to set the status when queue is manually changed
Will set the state to MANUAL
:param tx_hash: Transaction hash of record to modify
:type tx_hash: str, 0x-hex
:raises NotLocalTxError: If transaction not found in queue.
"""
session = SessionBase.create_session()
o = session.query(Otx).filter(Otx.tx_hash==tx_hash).first()
if o == None:
session.close()
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush()
o.manual(session=session)
session.commit()
session.close()
return tx_hash
@celery_app.task() @celery_app.task()
def set_ready(tx_hash): def set_ready(tx_hash):
"""Used to mark a transaction as ready to be sent to network """Used to mark a transaction as ready to be sent to network
@ -265,14 +295,11 @@ def set_ready(tx_hash):
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash)) raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
session.flush() session.flush()
if o.status == StatusEnum.WAITFORGAS or o.status == StatusEnum.PENDING: if o.status & StatusBits.GAS_ISSUES or o.status == StatusEnum.PENDING:
o.readysend(session=session) o.readysend(session=session)
else: else:
o.retry(session=session) o.retry(session=session)
logg.debug('ot otx otx {} {}'.format(tx_hash, o))
session.add(o)
session.commit() session.commit()
session.close() session.close()
@ -304,6 +331,7 @@ def set_waitforgas(tx_hash):
return tx_hash return tx_hash
@celery_app.task() @celery_app.task()
def get_state_log(tx_hash): def get_state_log(tx_hash):
@ -483,13 +511,14 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
q = session.query(Otx) q = session.query(Otx)
if status != None: if status != None:
if status == StatusEnum.PENDING or status >= StatusEnum.SENT: #if status == StatusEnum.PENDING or status >= StatusEnum.SENT:
if status == StatusEnum.PENDING or status & StatusBits.IN_NETWORK or not is_alive(status):
raise ValueError('not a valid paused tx value: {}'.format(status)) raise ValueError('not a valid paused tx value: {}'.format(status))
q = q.filter(Otx.status==status) q = q.filter(Otx.status.op('&')(status.value)==status.value)
q = q.join(TxCache) q = q.join(TxCache)
else: else:
q = q.filter(Otx.status>StatusEnum.PENDING) q = q.filter(Otx.status>StatusEnum.PENDING.value)
q = q.filter(Otx.status<StatusEnum.SENT) q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0))
if sender != None: if sender != None:
q = q.filter(TxCache.sender==sender) q = q.filter(TxCache.sender==sender)
@ -508,7 +537,7 @@ def get_paused_txs(status=None, sender=None, chain_id=0):
return txs return txs
def get_status_tx(status, before=None, limit=0): def get_status_tx(status, before=None, exact=False, limit=0):
"""Retrieve transaction with a specific queue status. """Retrieve transaction with a specific queue status.
:param status: Status to match transactions with :param status: Status to match transactions with
@ -525,7 +554,10 @@ def get_status_tx(status, before=None, limit=0):
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
q = q.filter(TxCache.date_updated<before) q = q.filter(TxCache.date_updated<before)
q = q.filter(Otx.status==status) if exact:
q = q.filter(Otx.status==status.value)
else:
q = q.filter(Otx.status.op('&')(status.value)==status.value)
i = 0 i = 0
for o in q.all(): for o in q.all():
if limit > 0 and i == limit: if limit > 0 and i == limit:
@ -565,9 +597,12 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
q_outer = q_outer.join(Lock, isouter=True) q_outer = q_outer.join(Lock, isouter=True)
q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0)) q_outer = q_outer.filter(or_(Lock.flags==None, Lock.flags.op('&')(LockEnum.SEND.value)==0))
if status >= StatusEnum.SENT: if not is_alive(status):
raise ValueError('not a valid non-final tx value: {}'.format(s)) raise ValueError('not a valid non-final tx value: {}'.format(status))
if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value) q_outer = q_outer.filter(Otx.status==status.value)
else:
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value)
if recipient != None: if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient) q_outer = q_outer.filter(TxCache.recipient==recipient)

View File

@ -2,7 +2,7 @@ web3==5.12.2
celery==4.4.7 celery==4.4.7
crypto-dev-signer~=0.4.13rc2 crypto-dev-signer~=0.4.13rc2
confini~=0.3.6b1 confini~=0.3.6b1
cic-registry~=0.5.3a10 cic-registry~=0.5.3a12
cic-bancor~=0.0.6 cic-bancor~=0.0.6
redis==3.5.3 redis==3.5.3
alembic==1.4.2 alembic==1.4.2

View File

@ -10,7 +10,11 @@ import web3
# local imports # local imports
from cic_eth.api import AdminApi from cic_eth.api import AdminApi
from cic_eth.db.models.role import AccountRole from cic_eth.db.models.role import AccountRole
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
)
from cic_eth.error import InitializationError from cic_eth.error import InitializationError
from cic_eth.eth.task import sign_and_register_tx from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.tx import cache_gas_refill_data from cic_eth.eth.tx import cache_gas_refill_data
@ -64,7 +68,11 @@ def test_resend_inplace(
api = AdminApi(c, queue=None) api = AdminApi(c, queue=None)
t = api.resend(tx_dict['hash'], chain_str, unlock=True) t = api.resend(tx_dict['hash'], chain_str, unlock=True)
tx_hash_new_hex = t.get() t.get()
i = 0
tx_hash_new_hex = None
for r in t.collect():
tx_hash_new_hex = r[1]
assert t.successful() assert t.successful()
tx_raw_new = get_tx(tx_hash_new_hex) tx_raw_new = get_tx(tx_hash_new_hex)
@ -74,142 +82,144 @@ def test_resend_inplace(
assert tx_dict_new['gasPrice'] > gas_price_before assert tx_dict_new['gasPrice'] > gas_price_before
tx_dict_after = get_tx(tx_dict['hash']) tx_dict_after = get_tx(tx_dict['hash'])
assert tx_dict_after['status'] == StatusEnum.OVERRIDDEN
logg.debug('logggg {}'.format(status_str(tx_dict_after['status'])))
assert tx_dict_after['status'] & StatusBits.MANUAL
def test_check_fix_nonce( #def test_check_fix_nonce(
default_chain_spec, # default_chain_spec,
init_database, # init_database,
init_eth_account_roles, # init_eth_account_roles,
init_w3, # init_w3,
eth_empty_accounts, # eth_empty_accounts,
celery_session_worker, # celery_session_worker,
): # ):
#
chain_str = str(default_chain_spec) # chain_str = str(default_chain_spec)
#
sigs = [] # sigs = []
for i in range(5): # for i in range(5):
s = celery.signature( # s = celery.signature(
'cic_eth.eth.tx.refill_gas', # 'cic_eth.eth.tx.refill_gas',
[ # [
eth_empty_accounts[i], # eth_empty_accounts[i],
chain_str, # chain_str,
], # ],
queue=None, # queue=None,
) # )
sigs.append(s) # sigs.append(s)
#
t = celery.group(sigs)() # t = celery.group(sigs)()
txs = t.get() # txs = t.get()
assert t.successful() # assert t.successful()
#
tx_hash = web3.Web3.keccak(hexstr=txs[2]) # tx_hash = web3.Web3.keccak(hexstr=txs[2])
c = RpcClient(default_chain_spec) # c = RpcClient(default_chain_spec)
api = AdminApi(c, queue=None) # api = AdminApi(c, queue=None)
address = init_eth_account_roles['eth_account_gas_provider'] # address = init_eth_account_roles['eth_account_gas_provider']
nonce_spec = api.check_nonce(address) # nonce_spec = api.check_nonce(address)
assert nonce_spec['nonce']['network'] == 0 # assert nonce_spec['nonce']['network'] == 0
assert nonce_spec['nonce']['queue'] == 4 # assert nonce_spec['nonce']['queue'] == 4
assert nonce_spec['nonce']['blocking'] == None # assert nonce_spec['nonce']['blocking'] == None
#
s_set = celery.signature( # s_set = celery.signature(
'cic_eth.queue.tx.set_rejected', # 'cic_eth.queue.tx.set_rejected',
[ # [
tx_hash.hex(), # tx_hash.hex(),
], # ],
queue=None, # queue=None,
) # )
t = s_set.apply_async() # t = s_set.apply_async()
t.get() # t.get()
t.collect() # t.collect()
assert t.successful() # assert t.successful()
#
#
nonce_spec = api.check_nonce(address) # nonce_spec = api.check_nonce(address)
assert nonce_spec['nonce']['blocking'] == 2 # assert nonce_spec['nonce']['blocking'] == 2
assert nonce_spec['tx']['blocking'] == tx_hash.hex() # assert nonce_spec['tx']['blocking'] == tx_hash.hex()
#
t = api.fix_nonce(address, nonce_spec['nonce']['blocking']) # t = api.fix_nonce(address, nonce_spec['nonce']['blocking'])
t.get() # t.get()
t.collect() # t.collect()
assert t.successful() # assert t.successful()
#
for tx in txs[3:]: # for tx in txs[3:]:
tx_hash = web3.Web3.keccak(hexstr=tx) # tx_hash = web3.Web3.keccak(hexstr=tx)
tx_dict = get_tx(tx_hash.hex()) # tx_dict = get_tx(tx_hash.hex())
assert tx_dict['status'] == StatusEnum.OVERRIDDEN # assert tx_dict['status'] == StatusEnum.OVERRIDDEN
#
#
def test_tag_account( #def test_tag_account(
init_database, # init_database,
eth_empty_accounts, # eth_empty_accounts,
init_rpc, # init_rpc,
): # ):
#
api = AdminApi(init_rpc) # api = AdminApi(init_rpc)
#
api.tag_account('foo', eth_empty_accounts[0]) # api.tag_account('foo', eth_empty_accounts[0])
api.tag_account('bar', eth_empty_accounts[1]) # api.tag_account('bar', eth_empty_accounts[1])
api.tag_account('bar', eth_empty_accounts[2]) # api.tag_account('bar', eth_empty_accounts[2])
#
assert AccountRole.get_address('foo') == eth_empty_accounts[0] # assert AccountRole.get_address('foo') == eth_empty_accounts[0]
assert AccountRole.get_address('bar') == eth_empty_accounts[2] # assert AccountRole.get_address('bar') == eth_empty_accounts[2]
#
#
def test_ready( #def test_ready(
init_database, # init_database,
eth_empty_accounts, # eth_empty_accounts,
init_rpc, # init_rpc,
w3, # w3,
): # ):
#
api = AdminApi(init_rpc) # api = AdminApi(init_rpc)
#
with pytest.raises(InitializationError): # with pytest.raises(InitializationError):
api.ready() # api.ready()
#
bogus_account = os.urandom(20) # bogus_account = os.urandom(20)
bogus_account_hex = '0x' + bogus_account.hex() # bogus_account_hex = '0x' + bogus_account.hex()
#
api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex)) # api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex))
with pytest.raises(KeyError): # with pytest.raises(KeyError):
api.ready() # api.ready()
#
api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0]) # api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0])
api.ready() # api.ready()
#
#
def test_tx( #def test_tx(
default_chain_spec, # default_chain_spec,
cic_registry, # cic_registry,
init_database, # init_database,
init_rpc, # init_rpc,
init_w3, # init_w3,
celery_session_worker, # celery_session_worker,
): # ):
#
tx = { # tx = {
'from': init_w3.eth.accounts[0], # 'from': init_w3.eth.accounts[0],
'to': init_w3.eth.accounts[1], # 'to': init_w3.eth.accounts[1],
'nonce': 42, # 'nonce': 42,
'gas': 21000, # 'gas': 21000,
'gasPrice': 1000000, # 'gasPrice': 1000000,
'value': 128, # 'value': 128,
'chainId': default_chain_spec.chain_id(), # 'chainId': default_chain_spec.chain_id(),
'data': '', # 'data': '',
} # }
#
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec)) # (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, str(default_chain_spec))
queue_create( # queue_create(
tx['nonce'], # tx['nonce'],
tx['from'], # tx['from'],
tx_hash_hex, # tx_hash_hex,
tx_signed_raw_hex, # tx_signed_raw_hex,
str(default_chain_spec), # str(default_chain_spec),
) # )
tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_signed_raw_hex[2:]), default_chain_spec.chain_id()) # tx_recovered = unpack_signed_raw_tx(bytes.fromhex(tx_signed_raw_hex[2:]), default_chain_spec.chain_id())
cache_gas_refill_data(tx_hash_hex, tx_recovered) # cache_gas_refill_data(tx_hash_hex, tx_recovered)
#
api = AdminApi(init_rpc, queue=None) # api = AdminApi(init_rpc, queue=None)
tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex) # tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex)

View File

@ -10,7 +10,10 @@ from cic_registry import zero_address
# local imports # local imports
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
)
logg = logging.getLogger() logg = logging.getLogger()
@ -169,6 +172,9 @@ def test_status_fubar(
) )
t = s.apply_async() t = s.apply_async()
t.get() t.get()
for n in t.collect():
pass
assert t.successful() assert t.successful()
init_database.refresh(otx)
assert otx.status == StatusEnum.FUBAR otx = Otx.load(tx_hash)
assert otx.status & StatusBits.UNKNOWN_ERROR

View File

@ -8,7 +8,11 @@ import celery
# local imports # local imports
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_error_status,
)
from cic_eth.eth.task import sign_and_register_tx from cic_eth.eth.task import sign_and_register_tx
logg = logging.getLogger() logg = logging.getLogger()
@ -101,7 +105,7 @@ def test_states_failed(
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
otx.sendfail(session=init_database) otx.sendfail(session=init_database)
init_database.add(otx)
init_database.commit() init_database.commit()
s = celery.signature( s = celery.signature(
@ -121,5 +125,9 @@ def test_states_failed(
pass pass
assert t.successful() assert t.successful()
init_database.commit()
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hash_hex).first()
assert otx.status == StatusEnum.RETRY.value assert otx.status & StatusEnum.RETRY == StatusEnum.RETRY
#assert otx.status & StatusBits.QUEUED
assert is_error_status(otx.status)

View File

@ -24,7 +24,6 @@ class Response:
status = 200 status = 200
@pytest.mark.skip()
def test_callback_http( def test_callback_http(
celery_session_worker, celery_session_worker,
mocker, mocker,
@ -43,7 +42,6 @@ def test_callback_http(
t.get() t.get()
@pytest.mark.skip()
def test_callback_tcp( def test_callback_tcp(
celery_session_worker, celery_session_worker,
): ):

View File

@ -0,0 +1,20 @@
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
status_str,
)
def test_status_str():
# String representation for a status in StatusEnum
s = status_str(StatusEnum.REVERTED)
assert s == 'REVERTED'
# String representation for a status not in StatusEnum
s = status_str(StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR)
assert s == 'LOCAL_ERROR,NODE_ERROR*'
# String representation for a status in StatusEnum, but bits only representation bit set
s = status_str(StatusEnum.REVERTED, bits_only=True)
assert s == 'IN_NETWORK,NETWORK_ERROR,FINAL'

View File

@ -9,7 +9,11 @@ import pytest
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.models.otx import OtxStateLog from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
)
logg = logging.getLogger() logg = logging.getLogger()
@ -70,8 +74,16 @@ def test_state_log(
otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database) otx = Otx.add(0, address, tx_hash, signed_tx, session=init_database)
otx.waitforgas(session=init_database) otx.waitforgas(session=init_database)
init_database.commit()
otx.readysend(session=init_database)
init_database.commit()
otx.sent(session=init_database) otx.sent(session=init_database)
init_database.commit()
otx.success(1024, session=init_database) otx.success(1024, session=init_database)
init_database.commit()
q = init_database.query(OtxStateLog) q = init_database.query(OtxStateLog)
q = q.filter(OtxStateLog.otx_id==otx.id) q = q.filter(OtxStateLog.otx_id==otx.id)
@ -80,5 +92,6 @@ def test_state_log(
assert logs[0].status == StatusEnum.PENDING assert logs[0].status == StatusEnum.PENDING
assert logs[1].status == StatusEnum.WAITFORGAS assert logs[1].status == StatusEnum.WAITFORGAS
assert logs[2].status == StatusEnum.SENT assert logs[2].status & StatusBits.QUEUED
assert logs[3].status == StatusEnum.SUCCESS assert logs[3].status & StatusBits.IN_NETWORK
assert not is_alive(logs[4].status)

View File

@ -1,55 +0,0 @@
# standard imports
import logging
# third-party imports
import pytest
# local imports
from cic_eth.db import Otx
from cic_eth.db.error import TxStateChangeError
logg = logging.getLogger()
# Check that invalid transitions throw exceptions
# sent
def test_db_queue_states(
init_database,
):
session = init_database
# these values are completely arbitary
tx_hash = '0xF182DFA3AD48723E7E222FE7B4C2C44C23CD4D7FF413E8999DFA15ECE53F'
address = '0x38C5559D6EDDDA1F705D3AB1A664CA1B397EB119'
signed_tx = '0xA5866A5383249AE843546BDA46235A1CA1614F538FB486140693C2EF1956FC53213F6AEF0F99F44D7103871AF3A12B126DCF9BFB7AF11143FAB3ECE2B452EE35D1320C4C7C6F999C8DF4EB09E729715B573F6672ED852547F552C4AE99D17DCD14C810'
o = Otx(
nonce=42,
address=address[2:],
tx_hash=tx_hash[2:],
signed_tx=signed_tx[2:],
)
session.add(o)
session.commit()
o.sent(session=session)
session.commit()
# send after sent is ok
o.sent(session=session)
session.commit()
o.sendfail(session=session)
session.commit()
with pytest.raises(TxStateChangeError):
o.sendfail(session=session)
o.sent(session=session)
session.commit()
o.minefail(1234, session=session)
session.commit()
with pytest.raises(TxStateChangeError):
o.sent(session=session)

View File

@ -0,0 +1,97 @@
# standard imports
import os
# third-party imports
import pytest
# local imports
from cic_eth.db.models.otx import Otx
from cic_eth.db.enum import (
StatusEnum,
StatusBits,
is_alive,
)
@pytest.fixture(scope='function')
def otx(
init_database,
):
bogus_hash = '0x' + os.urandom(32).hex()
bogus_address = '0x' + os.urandom(20).hex()
bogus_tx_raw = '0x' + os.urandom(128).hex()
return Otx(0, bogus_address, bogus_hash, bogus_tx_raw)
def test_status_chain_gas(
init_database,
otx,
):
otx.waitforgas(init_database)
otx.readysend(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_straight_success(
init_database,
otx,
):
otx.readysend(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_straight_revert(
init_database,
otx,
):
otx.readysend(init_database)
otx.sent(init_database)
otx.minefail(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror(
init_database,
otx,
):
otx.readysend(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror_multiple(
init_database,
otx,
):
otx.readysend(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sendfail(init_database)
otx.retry(init_database)
otx.sent(init_database)
otx.success(1024, init_database)
assert not is_alive(otx.status)
def test_status_chain_nodeerror(
init_database,
otx,
):
otx.readysend(init_database)
otx.reject(init_database)
assert not is_alive(otx.status)

View File

@ -16,8 +16,14 @@ from cic_eth.db.models.otx import OtxSync
from cic_eth.db.models.tx import TxCache from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.lock import Lock from cic_eth.db.models.lock import Lock
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
from cic_eth.db.enum import LockEnum StatusEnum,
LockEnum,
StatusBits,
is_alive,
is_error_status,
status_str,
)
from cic_eth.queue.tx import create as queue_create from cic_eth.queue.tx import create as queue_create
from cic_eth.queue.tx import set_final_status from cic_eth.queue.tx import set_final_status
from cic_eth.queue.tx import set_sent_status from cic_eth.queue.tx import set_sent_status
@ -63,13 +69,14 @@ def test_finalize(
set_sent_status(tx_hash.hex()) set_sent_status(tx_hash.hex())
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status == StatusEnum.OBSOLETED assert otx.status & StatusBits.OBSOLETE
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status == StatusEnum.OBSOLETED assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status == StatusEnum.OBSOLETED assert otx.status & StatusBits.OBSOLETE
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status == StatusEnum.PENDING assert otx.status == StatusEnum.PENDING
@ -82,19 +89,22 @@ def test_finalize(
set_final_status(tx_hashes[3], 1024) set_final_status(tx_hashes[3], 1024)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[0]).first()
assert otx.status == StatusEnum.CANCELLED assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
assert not is_alive(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[1]).first()
assert otx.status == StatusEnum.CANCELLED assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[2]).first()
assert otx.status == StatusEnum.CANCELLED assert otx.status & (StatusBits.OBSOLETE | StatusBits.FINAL)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[3]).first()
assert otx.status == StatusEnum.SUCCESS assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first() otx = init_database.query(Otx).filter(Otx.tx_hash==tx_hashes[4]).first()
assert otx.status == StatusEnum.SENT assert otx.status & (StatusBits.IN_NETWORK | StatusBits.FINAL)
assert not is_error_status(otx.status)
def test_expired( def test_expired(
@ -404,7 +414,7 @@ def test_obsoletion(
session = SessionBase.create_session() session = SessionBase.create_session()
q = session.query(Otx) q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.OBSOLETED) q = q.filter(Otx.status.op('&')(StatusEnum.OBSOLETED.value)==StatusEnum.OBSOLETED.value)
z = 0 z = 0
for o in q.all(): for o in q.all():
z += o.nonce z += o.nonce
@ -416,13 +426,13 @@ def test_obsoletion(
session = SessionBase.create_session() session = SessionBase.create_session()
q = session.query(Otx) q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.OBSOLETED) q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.OBSOLETED.value)
zo = 0 zo = 0
for o in q.all(): for o in q.all():
zo += o.nonce zo += o.nonce
q = session.query(Otx) q = session.query(Otx)
q = q.filter(Otx.status==StatusEnum.CANCELLED) q = q.filter(Otx.status.op('&')(StatusEnum.CANCELLED.value)==StatusEnum.CANCELLED.value)
zc = 0 zc = 0
for o in q.all(): for o in q.all():
zc += o.nonce zc += o.nonce
@ -450,16 +460,20 @@ def test_retry(
q = q.filter(Otx.tx_hash==tx_hash) q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first() otx = q.first()
assert otx.status == StatusEnum.RETRY assert (otx.status & StatusEnum.RETRY.value) == StatusEnum.RETRY.value
assert is_error_status(otx.status)
set_sent_status(tx_hash, False) set_sent_status(tx_hash, False)
set_ready(tx_hash) set_ready(tx_hash)
init_database.commit()
q = init_database.query(Otx) q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==tx_hash) q = q.filter(Otx.tx_hash==tx_hash)
otx = q.first() otx = q.first()
assert otx.status == StatusEnum.RETRY assert (otx.status & StatusEnum.RETRY.value) == StatusBits.QUEUED.value
assert not is_error_status(otx.status)
def test_get_account_tx( def test_get_account_tx(