2021-02-01 18:12:51 +01:00
# standard imports
import datetime
import logging
2021-03-04 09:05:05 +01:00
# external imports
2021-02-01 18:12:51 +01:00
from sqlalchemy import Column , Enum , String , Integer , DateTime , Text , or_ , ForeignKey
from sqlalchemy . ext . hybrid import hybrid_property , hybrid_method
# local imports
from . base import SessionBase
2021-02-13 18:01:48 +01:00
from cic_eth . db . enum import (
StatusEnum ,
StatusBits ,
status_str ,
is_error_status ,
)
2021-02-01 18:12:51 +01:00
from cic_eth . db . error import TxStateChangeError
#from cic_eth.eth.util import address_hex_from_signed_tx
logg = logging . getLogger ( )
class OtxStateLog ( SessionBase ) :
__tablename__ = ' otx_state_log '
date = Column ( DateTime , default = datetime . datetime . utcnow )
status = Column ( Integer )
otx_id = Column ( Integer , ForeignKey ( ' otx.id ' ) )
def __init__ ( self , otx ) :
self . otx_id = otx . id
self . status = otx . status
class Otx ( SessionBase ) :
""" Outgoing transactions with local origin.
: param nonce : Transaction nonce
: type nonce : number
: param address : Ethereum address of recipient - NOT IN USE , REMOVE
: type address : str
: param tx_hash : Tranasction hash
: type tx_hash : str , 0 x - hex
: param signed_tx : Signed raw transaction data
: type signed_tx : str , 0 x - hex
"""
__tablename__ = ' otx '
tracing = False
""" Whether to enable queue state tracing """
nonce = Column ( Integer )
date_created = Column ( DateTime , default = datetime . datetime . utcnow )
tx_hash = Column ( String ( 66 ) )
signed_tx = Column ( Text )
status = Column ( Integer )
block = Column ( Integer )
2021-02-13 18:01:48 +01:00
def __set_status ( self , status , session ) :
self . status | = status
session . add ( self )
session . flush ( )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
def __reset_status ( self , status , session ) :
status_edit = ~ status & self . status
self . status & = status_edit
session . add ( self )
session . flush ( )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
def __status_already_set ( self , status ) :
r = bool ( self . status & status )
if r :
logg . warning ( ' status bit {} already set on {} ' . format ( status . name , self . tx_hash ) )
return r
2021-02-01 18:12:51 +01:00
2021-03-01 21:15:17 +01:00
def __status_not_set ( self , status ) :
r = not ( self . status & status )
if r :
logg . warning ( ' status bit {} not set on {} ' . format ( status . name , self . tx_hash ) )
return r
2021-02-01 18:12:51 +01:00
def set_block ( self , block , session = None ) :
""" Set block number transaction was mined in.
Only manipulates object , does not transaction or commit to backend .
: param block : Block number
: type block : number
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
localsession = session
if localsession == None :
localsession = SessionBase . create_session ( )
if self . block != None :
raise TxStateChangeError ( ' Attempted set block {} when block was already {} ' . format ( block , self . block ) )
self . block = block
localsession . add ( self )
localsession . flush ( )
if session == None :
localsession . commit ( )
localsession . close ( )
def waitforgas ( self , session = None ) :
""" Marks transaction as suspended pending gas funding.
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . GAS_ISSUES ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' GAS_ISSUES cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' GAS_ISSUES cannot be set on an entry with IN_NETWORK state set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . GAS_ISSUES , session )
self . __reset_status ( StatusBits . QUEUED | StatusBits . DEFERRED , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def fubar ( self , session = None ) :
""" Marks transaction as " fubar. " Any transaction marked this way is an anomaly and may be a symptom of a serious problem.
Only manipulates object , does not transaction or commit to backend .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . UNKNOWN_ERROR ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' FUBAR cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if is_error_status ( self . status ) :
raise TxStateChangeError ( ' FUBAR cannot be set on an entry with an error state already set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . UNKNOWN_ERROR | StatusBits . FINAL , session )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def reject ( self , session = None ) :
""" Marks transaction as " rejected, " which means the node rejected sending the transaction to the network. The nonce has not been spent, and the transaction should be replaced.
Only manipulates object , does not transaction or commit to backend .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . NODE_ERROR ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' REJECTED cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' REJECTED cannot be set on an entry already IN_NETWORK ( {} ) ' . format ( status_str ( self . status ) ) )
if is_error_status ( self . status ) :
raise TxStateChangeError ( ' REJECTED cannot be set on an entry with an error state already set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . NODE_ERROR | StatusBits . FINAL , session )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
if self . tracing :
self . __state_log ( session = session )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
SessionBase . release_session ( session )
def override ( self , manual = False , session = None ) :
2021-02-01 18:12:51 +01:00
""" Marks transaction as manually overridden.
Only manipulates object , does not transaction or commit to backend .
"""
2021-02-13 18:01:48 +01:00
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' OVERRIDDEN/OBSOLETED cannot be set on an entry already IN_NETWORK ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . OBSOLETE :
raise TxStateChangeError ( ' OVERRIDDEN/OBSOLETED cannot be set on an entry already OBSOLETE ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . OBSOLETE , session )
#if manual:
# self.__set_status(StatusBits.MANUAL, session)
self . __reset_status ( StatusBits . QUEUED | StatusBits . IN_NETWORK , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
2021-02-13 18:01:48 +01:00
def manual ( self , session = None ) :
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' OVERRIDDEN/OBSOLETED cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . MANUAL , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def retry ( self , session = None ) :
""" Marks transaction as ready to retry after a timeout following a sendfail or a completed gas funding.
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . QUEUED ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' RETRY cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if not is_error_status ( self . status ) and not StatusBits . IN_NETWORK & self . status > 0 :
raise TxStateChangeError ( ' RETRY cannot be set on an entry that has no error ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . QUEUED , session )
self . __reset_status ( StatusBits . GAS_ISSUES , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def readysend ( self , session = None ) :
""" Marks transaction as ready for initial send attempt.
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . QUEUED ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' READYSEND cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if is_error_status ( self . status ) :
raise TxStateChangeError ( ' READYSEND cannot be set on an errored state ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . QUEUED , session )
self . __reset_status ( StatusBits . GAS_ISSUES , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def sent ( self , session = None ) :
""" Marks transaction as having been sent to network.
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . IN_NETWORK ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' SENT cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . IN_NETWORK , session )
self . __reset_status ( StatusBits . DEFERRED | StatusBits . QUEUED | StatusBits . LOCAL_ERROR | StatusBits . NODE_ERROR , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def sendfail ( self , session = None ) :
""" Marks that an attempt to send the transaction to the network has failed.
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . NODE_ERROR ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' SENDFAIL cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' SENDFAIL cannot be set on an entry with IN_NETWORK state set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __set_status ( StatusBits . LOCAL_ERROR | StatusBits . DEFERRED , session )
self . __reset_status ( StatusBits . QUEUED | StatusBits . GAS_ISSUES , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
2021-03-01 21:15:17 +01:00
def dequeue ( self , session = None ) :
""" Marks that a process to execute send attempt is underway
Only manipulates object , does not transaction or commit to backend .
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
if self . __status_not_set ( StatusBits . QUEUED ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' SENDFAIL cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' SENDFAIL cannot be set on an entry with IN_NETWORK state set ( {} ) ' . format ( status_str ( self . status ) ) )
self . __reset_status ( StatusBits . QUEUED , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def minefail ( self , block , session = None ) :
""" Marks that transaction was mined but code execution did not succeed.
Only manipulates object , does not transaction or commit to backend .
: param block : Block number transaction was mined in .
: type block : number
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
if self . __status_already_set ( StatusBits . NETWORK_ERROR ) :
return
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' REVERTED cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if not self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' REVERTED cannot be set on an entry without IN_NETWORK state set ( {} ) ' . format ( status_str ( self . status ) ) )
2021-02-01 18:12:51 +01:00
if block != None :
self . block = block
2021-02-13 18:01:48 +01:00
self . __set_status ( StatusBits . NETWORK_ERROR | StatusBits . FINAL , session )
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def cancel ( self , confirmed = False , session = None ) :
""" Marks that the transaction has been succeeded by a new transaction with same nonce.
If set to confirmed , the previous state must be OBSOLETED , and will transition to CANCELLED - a finalized state . Otherwise , the state must follow a non - finalized state , and will be set to OBSOLETED .
Only manipulates object , does not transaction or commit to backend .
: param confirmed : Whether transition is to a final state .
: type confirmed : bool
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' CANCEL cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
2021-02-01 18:12:51 +01:00
if confirmed :
2021-03-06 18:01:51 +01:00
if self . status > 0 and not self . status & StatusBits . OBSOLETE :
2021-02-13 18:01:48 +01:00
raise TxStateChangeError ( ' CANCEL can only be set on an entry marked OBSOLETE ( {} ) ' . format ( status_str ( self . status ) ) )
2021-02-01 18:12:51 +01:00
self . __set_status ( StatusEnum . CANCELLED , session )
2021-02-13 18:01:48 +01:00
else :
2021-02-01 18:12:51 +01:00
self . __set_status ( StatusEnum . OBSOLETED , session )
2021-02-13 18:01:48 +01:00
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
def success ( self , block , session = None ) :
""" Marks that transaction was successfully mined.
Only manipulates object , does not transaction or commit to backend .
: param block : Block number transaction was mined in .
: type block : number
: raises cic_eth . db . error . TxStateChangeError : State change represents a sequence of events that should not exist .
"""
2021-02-13 18:01:48 +01:00
session = SessionBase . bind_session ( session )
if self . status & StatusBits . FINAL :
raise TxStateChangeError ( ' SUCCESS cannot be set on an entry with FINAL state set ( {} ) ' . format ( status_str ( self . status ) ) )
if not self . status & StatusBits . IN_NETWORK :
raise TxStateChangeError ( ' SUCCESS cannot be set on an entry without IN_NETWORK state set ( {} ) ' . format ( status_str ( self . status ) ) )
if is_error_status ( self . status ) :
raise TxStateChangeError ( ' SUCCESS cannot be set on an entry with error state set ( {} ) ' . format ( status_str ( self . status ) ) )
2021-02-01 18:12:51 +01:00
if block != None :
self . block = block
self . __set_status ( StatusEnum . SUCCESS , session )
2021-02-13 18:01:48 +01:00
if self . tracing :
self . __state_log ( session = session )
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
@staticmethod
2021-02-17 10:33:18 +01:00
def get ( status = 0 , limit = 4096 , status_exact = True , session = None ) :
2021-02-01 18:12:51 +01:00
""" Returns outgoing transaction lists by status.
Status may either be matched exactly , or be an upper bound of the integer value of the status enum .
: param status : Status value to use in query
: type status : cic_eth . db . enum . StatusEnum
: param limit : Max results to return
: type limit : number
: param status_exact : Whether or not to perform exact status match
: type bool :
: returns : List of transaction hashes
: rtype : tuple , where first element is transaction hash
"""
e = None
2021-02-17 10:33:18 +01:00
session = SessionBase . bind_session ( session )
2021-02-01 18:12:51 +01:00
if status_exact :
e = session . query ( Otx . tx_hash ) . filter ( Otx . status == status ) . order_by ( Otx . date_created . asc ( ) ) . limit ( limit ) . all ( )
else :
e = session . query ( Otx . tx_hash ) . filter ( Otx . status < = status ) . order_by ( Otx . date_created . asc ( ) ) . limit ( limit ) . all ( )
2021-02-17 10:33:18 +01:00
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
return e
@staticmethod
2021-02-17 10:33:18 +01:00
def load ( tx_hash , session = None ) :
2021-02-01 18:12:51 +01:00
""" Retrieves the outgoing transaction record by transaction hash.
: param tx_hash : Transaction hash
: type tx_hash : str , 0 x - hex
"""
2021-02-17 10:33:18 +01:00
session = SessionBase . bind_session ( session )
2021-02-01 18:12:51 +01:00
q = session . query ( Otx )
q = q . filter ( Otx . tx_hash == tx_hash )
2021-02-17 10:33:18 +01:00
SessionBase . release_session ( session )
2021-02-01 18:12:51 +01:00
return q . first ( )
@staticmethod
def account ( account_address ) :
""" Retrieves all transaction hashes for which the given Ethereum address is sender or recipient.
: param account_address : Ethereum address to use in query .
: type account_address : str , 0 x - hex
: returns : Outgoing transactions
: rtype : tuple , where first element is transaction hash
"""
session = Otx . create_session ( )
q = session . query ( Otx . tx_hash )
q = q . join ( TxCache )
q = q . filter ( or_ ( TxCache . sender == account_address , TxCache . recipient == account_address ) )
txs = q . all ( )
session . close ( )
return list ( txs )
def __state_log ( self , session ) :
l = OtxStateLog ( self )
session . add ( l )
@staticmethod
def add ( nonce , address , tx_hash , signed_tx , session = None ) :
localsession = session
if localsession == None :
localsession = SessionBase . create_session ( )
otx = Otx ( nonce , address , tx_hash , signed_tx )
localsession . add ( otx )
localsession . flush ( )
if otx . tracing :
otx . __state_log ( session = localsession )
localsession . flush ( )
if session == None :
localsession . commit ( )
localsession . close ( )
return None
return otx
def __init__ ( self , nonce , address , tx_hash , signed_tx ) :
self . nonce = nonce
self . tx_hash = tx_hash
self . signed_tx = signed_tx
self . status = StatusEnum . PENDING
signed_tx_bytes = bytes . fromhex ( signed_tx [ 2 : ] )
# sender_address = address_hex_from_signed_tx(signed_tx_bytes)
# logg.debug('decoded tx {}'.format(sender_address))
# TODO: Most of the methods on this object are obsolete, but it contains a static function for retrieving "expired" outgoing transactions that should be moved to Otx instead.
class OtxSync ( SessionBase ) :
""" Obsolete
"""
__tablename__ = ' otx_sync '
blockchain = Column ( String )
block_height_backlog = Column ( Integer )
tx_height_backlog = Column ( Integer )
block_height_session = Column ( Integer )
tx_height_session = Column ( Integer )
block_height_head = Column ( Integer )
tx_height_head = Column ( Integer )
date_created = Column ( DateTime , default = datetime . datetime . utcnow )
date_updated = Column ( DateTime )
def backlog ( self , block_height = None , tx_height = None ) :
#session = OtxSync.create_session()
if block_height != None :
if tx_height == None :
raise ValueError ( ' tx height missing ' )
self . block_height_backlog = block_height
self . tx_height_backlog = tx_height
#session.add(self)
self . date_updated = datetime . datetime . utcnow ( )
#session.commit()
block_height = self . block_height_backlog
tx_height = self . tx_height_backlog
#session.close()
return ( block_height , tx_height )
def session ( self , block_height = None , tx_height = None ) :
#session = OtxSync.create_session()
if block_height != None :
if tx_height == None :
raise ValueError ( ' tx height missing ' )
self . block_height_session = block_height
self . tx_height_session = tx_height
#session.add(self)
self . date_updated = datetime . datetime . utcnow ( )
#session.commit()
block_height = self . block_height_session
tx_height = self . tx_height_session
#session.close()
return ( block_height , tx_height )
def head ( self , block_height = None , tx_height = None ) :
#session = OtxSync.create_session()
if block_height != None :
if tx_height == None :
raise ValueError ( ' tx height missing ' )
self . block_height_head = block_height
self . tx_height_head = tx_height
#session.add(self)
self . date_updated = datetime . datetime . utcnow ( )
#session.commit()
block_height = self . block_height_head
tx_height = self . tx_height_head
#session.close()
return ( block_height , tx_height )
@hybrid_property
def synced ( self ) :
#return self.block_height_session == self.block_height_backlog and self.tx_height_session == self.block_height_backlog
return self . block_height_session == self . block_height_backlog and self . tx_height_session == self . tx_height_backlog
@staticmethod
def load ( blockchain_string , session ) :
q = session . query ( OtxSync )
q = q . filter ( OtxSync . blockchain == blockchain_string )
return q . first ( )
@staticmethod
def latest ( nonce ) :
session = SessionBase . create_session ( )
otx = session . query ( Otx ) . filter ( Otx . nonce == nonce ) . order_by ( Otx . created . desc ( ) ) . first ( )
session . close ( )
return otx
@staticmethod
def get_expired ( datetime_threshold ) :
session = SessionBase . create_session ( )
q = session . query ( Otx )
q = q . filter ( Otx . date_created < datetime_threshold )
q = q . filter ( Otx . status == StatusEnum . SENT )
q = q . order_by ( Otx . date_created . desc ( ) )
q = q . group_by ( Otx . nonce )
q = q . group_by ( Otx . id )
otxs = q . all ( )
session . close ( )
return otxs
def chain ( self ) :
return self . blockchain
def __init__ ( self , blockchain ) :
self . blockchain = blockchain
self . block_height_head = 0
self . tx_height_head = 0
self . block_height_session = 0
self . tx_height_session = 0
self . block_height_backlog = 0
self . tx_height_backlog = 0