Remove unused block number property in tx-cache, add docstrings

This commit is contained in:
nolash 2021-08-27 09:44:12 +02:00
parent 08e23ae584
commit 0e458fd9f7
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
6 changed files with 228 additions and 72 deletions

View File

@ -1 +0,0 @@

View File

@ -10,12 +10,16 @@ from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger()
Model = declarative_base(name='Model')
CONNECTION_OVERFLOW_FACTOR = 3
CONNECTION_RECYCLE_AFTER = 60
class SessionBase(Model):
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
@ -48,6 +52,9 @@ class SessionBase(Model):
@staticmethod
def _set_engine(engine):
"""Sets the database engine static property
:param engine: The sqlalchemy engine
:type engine: sqlalchemy.engine.Engine
"""
SessionBase.engine = engine
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
@ -57,8 +64,20 @@ class SessionBase(Model):
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
The pool_size argument controls the behavior of the connection pool.
If the pool_size is greater than 1, and the engine has connection pool settings, The connection pool will be set up with the given number of connections. By default, it allows for 3x connection overflow (CONNECTION_OVERFLOW_FACTOR), and connection recycling after 60 seconds of inactivity (CONNECTION_RECYCLE_AFTER).
If the pool_size is 1 and debug mode is off, the StaticPool class (single connection pool) will be used. If debug is on, AssertionPool will be used (which raises assertionerror if more than a single connection is attempted at any one time by the process).
If the underlying engine does not have pooling capabilities, the pool_size parameter toggles the connection class used. If pool_size is set to 0, the NullPool will be used (build a new connection for every session). If pool_size is set to a positive number, the StaticPool will be used, keeping a single connection for all sessions.
:param dsn: DSN string defining connection
:type dsn: str
:param pool_size: Size of connection pool
:type pool_size: int
:param debug: Activate sql debug mode (outputs sql statements)
:type debug: bool
"""
e = None
if SessionBase.poolable:
@ -66,10 +85,10 @@ class SessionBase(Model):
if pool_size > 1:
e = create_engine(
dsn,
max_overflow=pool_size*3,
max_overflow=pool_size * CONNECTION_OVERFLOW_FACTOR,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
pool_recycle=CONNECTION_RECYCLE_AFTER,
poolclass=poolclass,
echo=debug,
)
@ -85,8 +104,12 @@ class SessionBase(Model):
echo=debug,
)
else:
pool_class = StaticPool
if pool_size < 1:
pool_class = NullPool
e = create_engine(
dsn,
poolclass=pool_class,
echo=debug,
)
@ -103,6 +126,17 @@ class SessionBase(Model):
@staticmethod
def bind_session(session=None):
"""Convenience function to enforce database session responsilibity in call stacks where it is unclear which layer will create a database session.
If the session argument is None, the method will create and return a new database session. A reference to the database session will be statically stored in the SessionBase class, and must be explicitly released with release_session.
When an existing session in passed as the argument, this method simply returns back the same session.
:param session: An sqlalchemy session
:type session: session.orm.Session
:rtype: session.orm.Session
:returns: An sqlalchemy session
"""
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
@ -113,7 +147,12 @@ class SessionBase(Model):
@staticmethod
def release_session(session=None):
def release_session(session):
"""Checks if a reference to the given session exists in the SessionBase session store, and if it does commits the transaction and closes the session.
:param session: An sqlalchemy session
:type session: session.orm.Session
"""
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))

View File

@ -28,8 +28,6 @@ class Otx(SessionBase):
:param nonce: Transaction nonce
:type nonce: number
:param address: Ethereum address of recipient - NOT IN USE, REMOVE
:type address: str
:param tx_hash: Tranasction hash
:type tx_hash: str, 0x-hex
:param signed_tx: Signed raw transaction data
@ -41,12 +39,26 @@ class Otx(SessionBase):
"""Whether to enable queue state tracing"""
nonce = Column(Integer)
"""Transaction nonce"""
date_created = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime when record was created"""
date_updated = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime when record was last updated"""
tx_hash = Column(String(66))
"""Tranasction hash"""
signed_tx = Column(Text)
"""Signed raw transaction data"""
status = Column(Integer)
"""The status bit field of the transaction"""
block = Column(Integer)
"""The block number in which the transaction has been included"""
def __init__(self, nonce, tx_hash, signed_tx):
self.nonce = nonce
self.tx_hash = strip_0x(tx_hash)
self.signed_tx = strip_0x(signed_tx)
self.status = StatusEnum.PENDING
def __set_status(self, status, session):
@ -85,6 +97,8 @@ class Otx(SessionBase):
:param block: Block number
:type block: number
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
@ -105,6 +119,8 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.GAS_ISSUES):
@ -134,6 +150,10 @@ class Otx(SessionBase):
"""Marks transaction as "fubar." Any transaction marked this way is an anomaly and may be a symptom of a serious problem.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.UNKNOWN_ERROR):
return
@ -166,6 +186,10 @@ class Otx(SessionBase):
"""Marks transaction as "rejected," which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
return
@ -201,6 +225,10 @@ class Otx(SessionBase):
"""Marks transaction as manually overridden.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
@ -228,10 +256,15 @@ class Otx(SessionBase):
def manual(self, session=None):
"""Marks transaction as having been manually overridden.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
if self.status & StatusBits.FINAL:
status = status_str(self.status)
SessionBase.release_session(session)
@ -246,10 +279,10 @@ class Otx(SessionBase):
def retry(self, session=None):
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
Only manipulates object, does not transaction or commit to backend.
"""Marks transaction as ready to retry after a timeout following a sendfail or a completed fee funding.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
@ -278,8 +311,8 @@ class Otx(SessionBase):
def readysend(self, session=None):
"""Marks transaction as ready for initial send attempt.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.QUEUED):
@ -308,8 +341,8 @@ class Otx(SessionBase):
def sent(self, session=None):
"""Marks transaction as having been sent to network.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.IN_NETWORK):
@ -339,8 +372,8 @@ class Otx(SessionBase):
def sendfail(self, session=None):
"""Marks that an attempt to send the transaction to the network has failed.
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NODE_ERROR):
@ -375,6 +408,8 @@ class Otx(SessionBase):
Only manipulates object, does not transaction or commit to backend.
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.RESERVED):
@ -412,6 +447,8 @@ class Otx(SessionBase):
:param block: Block number transaction was mined in.
:type block: number
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
if self.__status_already_set(StatusBits.NETWORK_ERROR):
@ -448,6 +485,8 @@ class Otx(SessionBase):
:param confirmed: Whether transition is to a final state.
:type confirmed: bool
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
session = SessionBase.bind_session(session)
@ -480,6 +519,8 @@ class Otx(SessionBase):
:param block: Block number transaction was mined in.
:type block: number
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises cic_eth.db.error.TxStateChangeError: State change represents a sequence of events that should not exist.
"""
@ -518,10 +559,13 @@ class Otx(SessionBase):
:type status: cic_eth.db.enum.StatusEnum
:param limit: Max results to return
:type limit: number
:param status_exact: Whether or not to perform exact status match
:type bool:
:param status_exact: If false, records where status integer value is less than or equal to the argument will be returned
:type status_exact: bool
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:returns: List of transaction hashes
:rtype: tuple, where first element is transaction hash
:todo: This approach is obsolete and this method may return unexpected results; the original status enum was organized so that higher status values matched state of processing towards final state. This is no longer the case.
"""
e = None
@ -542,6 +586,10 @@ class Otx(SessionBase):
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:rtype: chainqueue.db.models.otx.Otx
:returns: Matching otx record
"""
session = SessionBase.bind_session(session)
@ -553,8 +601,6 @@ class Otx(SessionBase):
return q.first()
def __state_log(self, session):
l = OtxStateLog(self)
session.add(l)
@ -563,6 +609,19 @@ class Otx(SessionBase):
# TODO: it is not safe to return otx here unless session has been passed in
@staticmethod
def add(nonce, tx_hash, signed_tx, session=None):
"""Add a new otx record to database.
The resulting Otx object will only be returned if the database session is provided by the caller. Otherwise, the returnvalue of the method will be None.
:param tx_hash: Transaction hash, in hex
:type tx_hash: str
:param signed_tx: Signed transaction data, in hex
:type signed_tx: str
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:rtype: chainqueue.db.models.otx.Otx
:returns: Matching otx record
"""
external_session = session != None
session = SessionBase.bind_session(session)
@ -580,10 +639,3 @@ class Otx(SessionBase):
return None
return otx
def __init__(self, nonce, tx_hash, signed_tx):
self.nonce = nonce
self.tx_hash = strip_0x(tx_hash)
self.signed_tx = strip_0x(signed_tx)
self.status = StatusEnum.PENDING

View File

@ -9,16 +9,21 @@ from .base import SessionBase
class OtxStateLog(SessionBase):
"""Records state change history for a transaction.
:param otx: Otx object to read and record state for
:type otx: chainqueue.db.models.otx.
"""
__tablename__ = 'otx_state_log'
date = Column(DateTime, default=datetime.datetime.utcnow)
"""Date for log entry"""
status = Column(Integer)
"""Status value after change"""
otx_id = Column(Integer, ForeignKey('otx.id'))
"""Foreign key of otx record"""
def __init__(self, otx):
self.otx_id = otx.id
self.status = otx.status
self.date = datetime.datetime.utcnow()

View File

@ -46,30 +46,67 @@ class TxCache(SessionBase):
:type to_value: number
:param block_number: Block height the transaction was mined at, or None if not yet mined
:type block_number: number or None
:param tx_number: Block transaction height the transaction was mined at, or None if not yet mined
:param tx_number: Transaction index in the block the transaction was mined in, or None if not yet mined
:type tx_number: number or None
:raises FileNotFoundError: Outgoing transaction for given transaction hash does not exist
:raises NotLocalTxError: Outgoing transaction for given transaction hash does not exist
"""
__tablename__ = 'tx_cache'
otx_id = Column(Integer, ForeignKey('otx.id'))
"""Foreign key to chainqueue.db.models.otx.Otx"""
source_token_address = Column(String(42))
"""Contract address of token that sender spent from"""
destination_token_address = Column(String(42))
"""Contract address of token that recipient will receive balance of"""
sender = Column(String(42))
"""Ethereum address of transaction sender"""
recipient = Column(String(42))
"""Ethereum address of transaction beneficiary (e.g. token transfer recipient)"""
from_value = Column(NUMERIC())
"""Amount of source tokens spent"""
to_value = Column(NUMERIC())
#block_number = Column(Integer())
"""Amount of destination tokens received"""
tx_index = Column(Integer())
"""Transaction index in the block the transaction was mined in, or None if not yet mined"""
date_created = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime record was created"""
date_updated = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime record was last updated"""
date_checked = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime record was last checked for state change"""
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==strip_0x(tx_hash))
tx = q.first()
if tx == None:
SessionBase.release_session(session)
raise NotLocalTxError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
self.sender = sender
self.recipient = recipient
self.source_token_address = source_token_address
self.destination_token_address = destination_token_address
self.from_value = from_value
self.to_value = to_value
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
if tx_index != None and block_number != tx.block:
raise ValueError('Block number in argument {} does not match otx record {}'.format(block_number, tx.block))
self.tx_index = tx_index
SessionBase.release_session(session)
def check(self):
"""Update the "checked" timestamp to current time.
Only manipulates object, does not transaction or commit to backend.
:todo: evaluate whether this should consume a session like other methods
"""
self.date_checked = datetime.datetime.utcnow()
@ -86,6 +123,10 @@ class TxCache(SessionBase):
:type tx_hash_original: str, 0x-hex
:param tx_hash_new: tx hash to associate the copied entry with
:type tx_hash_new: str, 0x-hex
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raise NotLocalTxError: Transaction does not exist in the local queue, or could not be created
:raies TxStateChangeError: Attempt to clone an already confirmed transaction
"""
session = SessionBase.bind_session(session)
@ -132,6 +173,8 @@ class TxCache(SessionBase):
:param account_address: Ethereum address to use in query.
:type account_address: str, 0x-hex
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:returns: Outgoing transactions
:rtype: tuple, where first element is transaction hash
"""
@ -148,6 +191,20 @@ class TxCache(SessionBase):
@staticmethod
def set_final(tx_hash, block_number, tx_index, session=None):
"""Sets the transaction index for the confirmed transaction.
The block number of the block that included the transaction must be set in the otx record before this method is called (see chainqueue.db.models.otx.Otx.minefail and chainqueue.db.models.otx.Otx.success). The block number in the record must match the block number given as argument.
:param tx_hash: Transaction hash, in hex
:type tx_hash: str
:param block_number: Block number transaction was included in
:type block_number: int
:param tx_index: The transaction index of the block to cache
:type tx_index: int
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:raises NotLocalTxError: Transaction does not exist in local queue, or block column is not set in otx record.
"""
session = SessionBase.bind_session(session)
q = session.query(TxCache)
@ -172,6 +229,10 @@ class TxCache(SessionBase):
:param tx_hash: Transaction hash
:type tx_hash: str, 0x-hex
:param session: Sqlalchemy database session
:type session: sqlalchemy.orm.Session
:rtype: chainqueue.db.models.tx.TxCache
:returns: Transaction cache object
"""
session = SessionBase.bind_session(session)
@ -183,29 +244,3 @@ class TxCache(SessionBase):
return q.first()
def __init__(self, tx_hash, sender, recipient, source_token_address, destination_token_address, from_value, to_value, block_number=None, tx_index=None, session=None):
session = SessionBase.bind_session(session)
q = session.query(Otx)
q = q.filter(Otx.tx_hash==strip_0x(tx_hash))
tx = q.first()
if tx == None:
SessionBase.release_session(session)
raise FileNotFoundError('outgoing transaction record unknown {} (add a Tx first)'.format(tx_hash))
self.otx_id = tx.id
self.sender = sender
self.recipient = recipient
self.source_token_address = source_token_address
self.destination_token_address = destination_token_address
self.from_value = from_value
self.to_value = to_value
self.block_number = block_number
self.tx_index = tx_index
# not automatically set in sqlite, it seems:
self.date_created = datetime.datetime.utcnow()
self.date_updated = self.date_created
self.date_checked = self.date_created
SessionBase.release_session(session)

View File

@ -7,21 +7,33 @@ class StatusBits(enum.IntEnum):
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
"""
QUEUED = 0x01 # transaction should be sent to network
RESERVED = 0x02 # transaction is currently being handled by a thread
IN_NETWORK = 0x08 # transaction is in network
QUEUED = 0x01
"""Transaction should be sent to network"""
RESERVED = 0x02
"""Transaction is currently being handled by a thread"""
IN_NETWORK = 0x08
"""Transaction is in network"""
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
DEFERRED = 0x10
"""An attempt to send the transaction to network has failed"""
GAS_ISSUES = 0x20
"""Transaction is pending sender account fee funding"""
LOCAL_ERROR = 0x100 # errors that originate internally from the component
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
LOCAL_ERROR = 0x100
"""Errors that originate internally from the component"""
NODE_ERROR = 0x200
"""Errors originating in the node (e.g. invalid transaction wire format input...)"""
NETWORK_ERROR = 0x400
"""Errors that originate from the network (REVERT)"""
UNKNOWN_ERROR = 0x800
"""Unclassified errors (that should not occur)"""
FINAL = 0x1000 # transaction processing has completed
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
MANUAL = 0x8000 # transaction processing has been manually overridden
FINAL = 0x1000
"""Transaction processing has completed"""
OBSOLETE = 0x2000
"""Transaction has been replaced by a different transaction (normally with higher fee)"""
MANUAL = 0x8000
"""Transaction processing has been manually overridden"""
@enum.unique
@ -29,24 +41,38 @@ class StatusEnum(enum.IntEnum):
"""Semantic states intended for human consumption
"""
PENDING = 0
"""Transaction has been added but no processing has been performed"""
SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR
"""Temporary error occurred when sending transaction to node"""
RETRY = StatusBits.QUEUED | StatusBits.DEFERRED
"""Transaction is ready to retry send to the network"""
READYSEND = StatusBits.QUEUED
"""Transaction is ready to be sent to network for first time"""
RESERVED = StatusBits.RESERVED
"""Transaction is currently being handled by a thread"""
OBSOLETED = StatusBits.OBSOLETE | StatusBits.IN_NETWORK
"""Transaction already in network has been replaced by a different transaction"""
WAITFORGAS = StatusBits.GAS_ISSUES
"""Transaction is pending sender account fee funding"""
SENT = StatusBits.IN_NETWORK
"""Transaction is in network"""
FUBAR = StatusBits.FINAL | StatusBits.UNKNOWN_ERROR
"""Transaction processing encountered an unknown error and will not be processed further"""
CANCELLED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.OBSOLETE
"""A superseding transaction was confirmed by the network, and the current transaction will not be processed further"""
OVERRIDDEN = StatusBits.FINAL | StatusBits.OBSOLETE | StatusBits.MANUAL
"""A superseding transaction was manually added. The current transaction will not be sent to network nor processed further"""
REJECTED = StatusBits.NODE_ERROR | StatusBits.FINAL
"""A permanent error occurred when attempting to send transaction to node"""
REVERTED = StatusBits.IN_NETWORK | StatusBits.FINAL | StatusBits.NETWORK_ERROR
"""Execution of transaction on network completed and was unsuccessful."""
SUCCESS = StatusBits.IN_NETWORK | StatusBits.FINAL
"""Execution of transaction on network completed and was successful"""
def status_str(v, bits_only=False):