Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9eb1be810a
|
||
|
|
4612984269
|
||
|
|
c4e58ad784
|
||
|
|
6d6a1a0d45
|
||
|
|
8b70a509f7
|
||
|
|
4c520e3433
|
||
|
|
70f03d1b9d
|
||
|
|
e5c8af1aaa
|
@@ -168,11 +168,10 @@ class Otx(SessionBase):
|
|||||||
status = status_str(self.status)
|
status = status_str(self.status)
|
||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status))
|
raise TxStateChangeError('FUBAR cannot be set on an entry with an error state already set ({})'.format(status))
|
||||||
if not self.status & StatusBits.RESERVED:
|
# if not self.status & StatusBits.RESERVED:
|
||||||
status = status_str(self.status)
|
# status = status_str(self.status)
|
||||||
SessionBase.release_session(session)
|
# SessionBase.release_session(session)
|
||||||
raise TxStateChangeError('FUBAR on tx that has not been RESERVED ({})'.format(status))
|
# raise TxStateChangeError('FUBAR on tx that has not been RESERVED ({})'.format(status))
|
||||||
|
|
||||||
|
|
||||||
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
|
self.__set_status(StatusBits.UNKNOWN_ERROR | StatusBits.FINAL, session)
|
||||||
self.__reset_status(StatusBits.QUEUED | StatusBits.RESERVED, session)
|
self.__reset_status(StatusBits.QUEUED | StatusBits.RESERVED, session)
|
||||||
@@ -239,9 +238,11 @@ class Otx(SessionBase):
|
|||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status))
|
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ({})'.format(status))
|
||||||
if self.status & StatusBits.IN_NETWORK:
|
if self.status & StatusBits.IN_NETWORK:
|
||||||
|
status = status_str(self.status)
|
||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status))
|
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ({})'.format(status))
|
||||||
if self.status & StatusBits.OBSOLETE:
|
if self.status & StatusBits.OBSOLETE:
|
||||||
|
status = status_str(self.status)
|
||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status))
|
raise TxStateChangeError('OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ({})'.format(status))
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
# standard imports
|
# standard imports
|
||||||
import enum
|
import enum
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
@enum.unique
|
@enum.unique
|
||||||
@@ -75,6 +76,13 @@ class StatusEnum(enum.IntEnum):
|
|||||||
"""Execution of transaction on network completed and was successful"""
|
"""Execution of transaction on network completed and was successful"""
|
||||||
|
|
||||||
|
|
||||||
|
status_mask = 0
|
||||||
|
r_bitmask = '^[A-Z][A-Z_]*$'
|
||||||
|
for v in dir(StatusBits):
|
||||||
|
if re.match(r_bitmask, v):
|
||||||
|
status_mask |= getattr(StatusBits, v).value
|
||||||
|
|
||||||
|
|
||||||
def status_str(v, bits_only=False):
|
def status_str(v, bits_only=False):
|
||||||
"""Render a human-readable string describing the status
|
"""Render a human-readable string describing the status
|
||||||
|
|
||||||
@@ -189,3 +197,7 @@ def is_alive(v):
|
|||||||
:rtype: bool
|
:rtype: bool
|
||||||
"""
|
"""
|
||||||
return bool(v & dead() == 0)
|
return bool(v & dead() == 0)
|
||||||
|
|
||||||
|
|
||||||
|
def status_all():
|
||||||
|
return status_mask
|
||||||
|
|||||||
@@ -153,8 +153,9 @@ def get_nonce_tx_cache(chain_spec, nonce, sender, decoder=None, session=None):
|
|||||||
|
|
||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
# TODO: Query for all noncea that are not yet finalized
|
||||||
def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, decoder=None):
|
# select otx.nonce, bit_or(status) as statusaggr from otx inner join tx_cache on otx.id = tx_cache.otx_id where sender = '9d8ce82642c18eb141b269cbc12870a1d2cc3a1f' group by otx.nonce having bit_or(status) & 4096 = 0;
|
||||||
|
def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, decoder=None, exclude_obsolete=True):
|
||||||
"""Returns not finalized transactions that have been attempted sent without success.
|
"""Returns not finalized transactions that have been attempted sent without success.
|
||||||
|
|
||||||
:param chain_spec: Chain spec for transaction network
|
:param chain_spec: Chain spec for transaction network
|
||||||
@@ -182,10 +183,12 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco
|
|||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
else:
|
else:
|
||||||
q = q.filter(Otx.status>StatusEnum.PENDING.value)
|
q = q.filter(Otx.status>StatusEnum.PENDING.value)
|
||||||
q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)>0))
|
q = q.filter(not_(Otx.status.op('&')(StatusBits.IN_NETWORK.value)==0))
|
||||||
|
q = q.filter(not_(Otx.status.op('&')(StatusBits.FINAL.value)==0))
|
||||||
if sender != None:
|
if sender != None:
|
||||||
q = q.filter(TxCache.sender==sender)
|
q = q.filter(TxCache.sender==sender)
|
||||||
|
if exclude_obsolete:
|
||||||
|
q = q.filter(Otx.status.op('&')(StatusBits.OBSOLETE.value)==0)
|
||||||
|
|
||||||
txs = {}
|
txs = {}
|
||||||
gas = 0
|
gas = 0
|
||||||
@@ -207,7 +210,7 @@ def get_paused_tx_cache(chain_spec, status=None, sender=None, session=None, deco
|
|||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
|
||||||
def get_status_tx_cache(chain_spec, status, not_status=None, before=None, exact=False, limit=0, session=None, decoder=None):
|
def get_status_tx_cache(chain_spec, status, not_status=None, before=None, exact=False, compare_checked=False, limit=0, session=None, decoder=None):
|
||||||
"""Retrieve transaction with a specific queue status.
|
"""Retrieve transaction with a specific queue status.
|
||||||
|
|
||||||
:param chain_spec: Chain spec for transaction network
|
:param chain_spec: Chain spec for transaction network
|
||||||
@@ -232,7 +235,10 @@ def get_status_tx_cache(chain_spec, status, not_status=None, before=None, exact=
|
|||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
if before != None:
|
if before != None:
|
||||||
q = q.filter(Otx.date_updated<before)
|
if compare_checked:
|
||||||
|
q = q.filter(Otx.date_updated<before)
|
||||||
|
else:
|
||||||
|
q = q.filter(TxCache.date_checked<before)
|
||||||
if exact:
|
if exact:
|
||||||
q = q.filter(Otx.status==status)
|
q = q.filter(Otx.status==status)
|
||||||
else:
|
else:
|
||||||
@@ -340,7 +346,7 @@ def get_upcoming_tx(chain_spec, status=StatusEnum.READYSEND, not_status=None, re
|
|||||||
return txs
|
return txs
|
||||||
|
|
||||||
|
|
||||||
def sql_range_filter(session, criteria=None):
|
def sql_range_filter(session, criteria=None, numeric_column='id'):
|
||||||
"""Convert an arbitrary type to a sql query range
|
"""Convert an arbitrary type to a sql query range
|
||||||
|
|
||||||
:param session: Backend state integrity session
|
:param session: Backend state integrity session
|
||||||
@@ -356,17 +362,30 @@ def sql_range_filter(session, criteria=None):
|
|||||||
if criteria == None:
|
if criteria == None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
original_criteria = criteria
|
||||||
|
|
||||||
|
if isinstance(criteria, str):
|
||||||
|
try:
|
||||||
|
criteria = datetime.datetime.fromisoformat(criteria)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
typ_str = 'hash'
|
||||||
if isinstance(criteria, str):
|
if isinstance(criteria, str):
|
||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
q = q.filter(Otx.tx_hash==strip_0x(criteria))
|
q = q.filter(Otx.tx_hash==strip_0x(criteria))
|
||||||
r = q.first()
|
r = q.first()
|
||||||
if r == None:
|
if r == None:
|
||||||
raise NotLocalTxError('unknown tx hash as bound criteria specified: {}'.format(criteria))
|
raise NotLocalTxError('unknown tx hash as bound criteria specified: {}'.format(criteria))
|
||||||
boundary = ('id', r.id,)
|
boundary = (numeric_column, getattr(r, numeric_column),)
|
||||||
elif isinstance(criteria, int):
|
elif isinstance(criteria, int):
|
||||||
boundary = ('id', criteria,)
|
boundary = (numeric_column, criteria,)
|
||||||
|
typ_str = numeric_column
|
||||||
elif isinstance(criteria, datetime.datetime):
|
elif isinstance(criteria, datetime.datetime):
|
||||||
boundary = ('date', criteria,)
|
boundary = ('date', criteria,)
|
||||||
|
typ_str = 'datetime'
|
||||||
|
|
||||||
|
logg.debug('sql range specifier {} interpreted as {}: {}'.format(original_criteria, typ_str, str(criteria)))
|
||||||
|
|
||||||
return boundary
|
return boundary
|
||||||
|
|
||||||
@@ -414,8 +433,8 @@ def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, count
|
|||||||
session = SessionBase.bind_session(session)
|
session = SessionBase.bind_session(session)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
filter_offset = sql_range_filter(session, criteria=since)
|
filter_offset = sql_range_filter(session, criteria=since, numeric_column='nonce')
|
||||||
filter_limit = sql_range_filter(session, criteria=until)
|
filter_limit = sql_range_filter(session, criteria=until, numeric_column='nonce')
|
||||||
except NotLocalTxError as e:
|
except NotLocalTxError as e:
|
||||||
logg.error('query build failed: {}'.format(e))
|
logg.error('query build failed: {}'.format(e))
|
||||||
return {}
|
return {}
|
||||||
@@ -432,12 +451,16 @@ def get_account_tx(chain_spec, address, as_sender=True, as_recipient=True, count
|
|||||||
if filter_offset != None:
|
if filter_offset != None:
|
||||||
if filter_offset[0] == 'id':
|
if filter_offset[0] == 'id':
|
||||||
q = q.filter(Otx.id>=filter_offset[1])
|
q = q.filter(Otx.id>=filter_offset[1])
|
||||||
|
elif filter_offset[0] == 'nonce':
|
||||||
|
q = q.filter(Otx.nonce>=filter_offset[1])
|
||||||
elif filter_offset[0] == 'date':
|
elif filter_offset[0] == 'date':
|
||||||
q = q.filter(Otx.date_created>=filter_offset[1])
|
q = q.filter(Otx.date_created>=filter_offset[1])
|
||||||
|
|
||||||
if filter_limit != None:
|
if filter_limit != None:
|
||||||
if filter_limit[0] == 'id':
|
if filter_limit[0] == 'id':
|
||||||
q = q.filter(Otx.id<=filter_limit[1])
|
q = q.filter(Otx.id<=filter_limit[1])
|
||||||
|
elif filter_limit[0] == 'nonce':
|
||||||
|
q = q.filter(Otx.nonce<=filter_limit[1])
|
||||||
elif filter_limit[0] == 'date':
|
elif filter_limit[0] == 'date':
|
||||||
q = q.filter(Otx.date_created<=filter_limit[1])
|
q = q.filter(Otx.date_created<=filter_limit[1])
|
||||||
|
|
||||||
@@ -486,8 +509,8 @@ def get_latest_txs(chain_spec, count=10, since=None, until=None, status=None, n
|
|||||||
session = SessionBase.bind_session(session)
|
session = SessionBase.bind_session(session)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
filter_offset = sql_range_filter(session, criteria=since)
|
filter_offset = sql_range_filter(session, criteria=since, numeric_column='nonce')
|
||||||
filter_limit = sql_range_filter(session, criteria=until)
|
filter_limit = sql_range_filter(session, criteria=until, numeric_column='nonce')
|
||||||
except NotLocalTxError as e:
|
except NotLocalTxError as e:
|
||||||
logg.error('query build failed: {}'.format(e))
|
logg.error('query build failed: {}'.format(e))
|
||||||
return {}
|
return {}
|
||||||
@@ -498,12 +521,16 @@ def get_latest_txs(chain_spec, count=10, since=None, until=None, status=None, n
|
|||||||
if filter_offset != None:
|
if filter_offset != None:
|
||||||
if filter_offset[0] == 'id':
|
if filter_offset[0] == 'id':
|
||||||
q = q.filter(Otx.id>=filter_offset[1])
|
q = q.filter(Otx.id>=filter_offset[1])
|
||||||
|
if filter_offset[0] == 'nonce':
|
||||||
|
q = q.filter(Otx.nonce>=filter_offset[1])
|
||||||
elif filter_offset[0] == 'date':
|
elif filter_offset[0] == 'date':
|
||||||
q = q.filter(Otx.date_created>=filter_offset[1])
|
q = q.filter(Otx.date_created>=filter_offset[1])
|
||||||
|
|
||||||
if filter_limit != None:
|
if filter_limit != None:
|
||||||
if filter_limit[0] == 'id':
|
if filter_limit[0] == 'id':
|
||||||
q = q.filter(Otx.id<=filter_limit[1])
|
q = q.filter(Otx.id<=filter_limit[1])
|
||||||
|
elif filter_limit[0] == 'nonce':
|
||||||
|
q = q.filter(Otx.nonce<=filter_limit[1])
|
||||||
elif filter_limit[0] == 'date':
|
elif filter_limit[0] == 'date':
|
||||||
q = q.filter(Otx.date_created<=filter_limit[1])
|
q = q.filter(Otx.date_created<=filter_limit[1])
|
||||||
|
|
||||||
|
|||||||
@@ -405,7 +405,7 @@ def obsolete_by_cache(chain_spec, tx_hash, final, session=None):
|
|||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
q = q.filter(Otx.nonce==nonce)
|
q = q.filter(Otx.nonce==nonce)
|
||||||
q = q.filter(TxCache.sender==sender)
|
q = q.filter(TxCache.sender==sender)
|
||||||
q = q.filter(Otx.tx_hash!=strip_0x(tx_hash))
|
q = q.filter(Otx.id!=otxid)
|
||||||
|
|
||||||
for otwo in q.all():
|
for otwo in q.all():
|
||||||
try:
|
try:
|
||||||
@@ -424,3 +424,33 @@ def obsolete_by_cache(chain_spec, tx_hash, final, session=None):
|
|||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
return tx_hash
|
return tx_hash
|
||||||
|
|
||||||
|
|
||||||
|
def set_checked(chain_spec, tx_hash, session=None):
|
||||||
|
"""Set the checked date for the transaction to current datetime
|
||||||
|
|
||||||
|
:param chain_spec: Chain spec for transaction network
|
||||||
|
:type chain_spec: chainlib.chain.ChainSpec
|
||||||
|
:param tx_hash: Transaction hash of record to modify
|
||||||
|
:type tx_hash: str, 0x-hex
|
||||||
|
:param session: Backend state integrity session
|
||||||
|
:type session: varies
|
||||||
|
:raises NotLocalTxError: If transaction not found in queue.
|
||||||
|
:rtype: str
|
||||||
|
:returns: Transaction hash, in hex
|
||||||
|
"""
|
||||||
|
|
||||||
|
session = SessionBase.bind_session(session)
|
||||||
|
o = TxCache.load(tx_hash, session=session)
|
||||||
|
if o == None:
|
||||||
|
SessionBase.release_session(session)
|
||||||
|
raise NotLocalTxError('queue does not contain tx hash {}'.format(tx_hash))
|
||||||
|
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
o.check()
|
||||||
|
session.add(o)
|
||||||
|
session.commit()
|
||||||
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
|
return tx_hash
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ def create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_prede
|
|||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
logg.debug('queue created nonce {} from {} hash {}'.format(nonce, holder_address, tx_hash))
|
logg.debug('queue created nonce {} from {} hash {} tx {}'.format(nonce, holder_address, tx_hash, signed_tx))
|
||||||
return tx_hash
|
return tx_hash
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,6 @@ hexathon~=0.1.0
|
|||||||
leveldir~=0.3.0
|
leveldir~=0.3.0
|
||||||
alembic==1.4.2
|
alembic==1.4.2
|
||||||
SQLAlchemy==1.3.20
|
SQLAlchemy==1.3.20
|
||||||
confini~=0.5.1
|
confini>=0.5.1,<0.7.0
|
||||||
pyxdg~=0.27
|
pyxdg~=0.27
|
||||||
chainlib~=0.0.12
|
chainlib>=0.1.0b1,<0.2.0
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = chainqueue
|
name = chainqueue
|
||||||
version = 0.0.6rc3
|
version = 0.0.6rc9
|
||||||
description = Generic blockchain transaction queue control
|
description = Generic blockchain transaction queue control
|
||||||
author = Louis Holbrook
|
author = Louis Holbrook
|
||||||
author_email = dev@holbrook.no
|
author_email = dev@holbrook.no
|
||||||
|
|||||||
Reference in New Issue
Block a user