Add tx data normalizer to sql backend

This commit is contained in:
nolash 2021-08-28 03:08:22 +02:00
parent 876c6cb458
commit faa9988d53
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 32 additions and 32 deletions

View File

@ -44,7 +44,7 @@ class Otx(SessionBase):
"""Datetime when record was created""" """Datetime when record was created"""
date_updated = Column(DateTime, default=datetime.datetime.utcnow) date_updated = Column(DateTime, default=datetime.datetime.utcnow)
"""Datetime when record was last updated""" """Datetime when record was last updated"""
tx_hash = Column(String(66)) tx_hash = Column(String())
"""Tranasction hash""" """Tranasction hash"""
signed_tx = Column(Text) signed_tx = Column(Text)
"""Signed raw transaction data""" """Signed raw transaction data"""

View File

@ -54,13 +54,13 @@ class TxCache(SessionBase):
otx_id = Column(Integer, ForeignKey('otx.id')) otx_id = Column(Integer, ForeignKey('otx.id'))
"""Foreign key to chainqueue.db.models.otx.Otx""" """Foreign key to chainqueue.db.models.otx.Otx"""
source_token_address = Column(String(42)) source_token_address = Column(String())
"""Contract address of token that sender spent from""" """Contract address of token that sender spent from"""
destination_token_address = Column(String(42)) destination_token_address = Column(String())
"""Contract address of token that recipient will receive balance of""" """Contract address of token that recipient will receive balance of"""
sender = Column(String(42)) sender = Column(String())
"""Ethereum address of transaction sender""" """Ethereum address of transaction sender"""
recipient = Column(String(42)) recipient = Column(String())
"""Ethereum address of transaction beneficiary (e.g. token transfer recipient)""" """Ethereum address of transaction beneficiary (e.g. token transfer recipient)"""
from_value = Column(NUMERIC()) from_value = Column(NUMERIC())
"""Amount of source tokens spent""" """Amount of source tokens spent"""

View File

@ -1,6 +1,7 @@
# standard imports # standard imports
import logging import logging
import urllib.error import urllib.error
import copy
# external imports # external imports
from sqlalchemy.exc import ( from sqlalchemy.exc import (
@ -11,11 +12,6 @@ from chainlib.error import (
RPCNonceException, RPCNonceException,
DefaultErrorParser, DefaultErrorParser,
) )
from hexathon import (
add_0x,
strip_0x,
uniform as hex_uniform,
)
# local imports # local imports
from chainqueue.sql.tx import create as queue_create from chainqueue.sql.tx import create as queue_create
@ -33,6 +29,7 @@ from chainqueue.sql.state import (
set_rejected, set_rejected,
) )
from chainqueue.sql.tx import cache_tx_dict from chainqueue.sql.tx import cache_tx_dict
from chainqueue.encode import TxHexNormalize
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -52,12 +49,15 @@ class SQLBackend:
""" """
#def __init__(self, conn_spec, error_parser=None, *args, **kwargs): #def __init__(self, conn_spec, error_parser=None, *args, **kwargs):
def __init__(self, conn_spec, error_parser=None, pool_size=0, debug=False, *args, **kwargs): def __init__(self, conn_spec, tx_normalizer=None, error_parser=None, pool_size=0, debug=False, *args, **kwargs):
#SessionBase.connect(conn_spec, pool_size=kwargs.get('poolsize', 0), debug=kwargs.get('debug', False)) #SessionBase.connect(conn_spec, pool_size=kwargs.get('poolsize', 0), debug=kwargs.get('debug', False))
SessionBase.connect(conn_spec, pool_size=pool_size, debug=debug) SessionBase.connect(conn_spec, pool_size=pool_size, debug=debug)
if error_parser == None: if error_parser == None:
error_parser = DefaultErrorParser() error_parser = DefaultErrorParser()
self.error_parser = error_parser self.error_parser = error_parser
if tx_normalizer == None:
tx_normalizer = TxHexNormalize()
self.tx_normalizer = tx_normalizer
def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None): def create(self, chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=None):
@ -82,6 +82,8 @@ class SQLBackend:
:rtype: int :rtype: int
:returns: 0 if successfully added :returns: 0 if successfully added
""" """
tx_hash = self.tx_normalizer.tx_hash(tx_hash)
signed_tx = self.tx_normalizer.tx_wire(signed_tx)
try: try:
queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=session) queue_create(chain_spec, nonce, holder_address, tx_hash, signed_tx, obsolete_predecessors=True, session=session)
except IntegrityError as e: except IntegrityError as e:
@ -101,8 +103,15 @@ class SQLBackend:
:rtype: int :rtype: int
:returns: 0 if successful :returns: 0 if successful
""" """
(tx, txc_id) = cache_tx_dict(tx, session=session) ntx = copy.copy(tx)
logg.debug('cached {} db insert id {}'.format(tx, txc_id)) ntx['hash'] = self.tx_normalizer.tx_hash(ntx['hash'])
ntx['from'] = self.tx_normalizer.wallet_address(ntx['from'])
ntx['to'] = self.tx_normalizer.wallet_address(ntx['to'])
ntx['source_token'] = self.tx_normalizer.executable_address(ntx['source_token'])
ntx['destination_token'] = self.tx_normalizer.executable_address(ntx['destination_token'])
(tx_dict, txc_id) = cache_tx_dict(ntx, session=session)
logg.debug('cached {} db insert id {}'.format(tx_dict, txc_id))
return 0 return 0
@ -120,6 +129,7 @@ class SQLBackend:
:rtype: dict :rtype: dict
:returns: otx record summary :returns: otx record summary
""" """
tx_hash = self.tx_normalizer.tx_hash(tx_hash)
return backend_get_tx(chain_spec, tx_hash, session=session) return backend_get_tx(chain_spec, tx_hash, session=session)
@ -170,6 +180,7 @@ class SQLBackend:
:rtype: int :rtype: int
:returns: 0 if no error :returns: 0 if no error
""" """
tx_hash = self.tx_normalizer.tx_hash(tx_hash)
set_reserved(chain_spec, tx_hash, session=session) set_reserved(chain_spec, tx_hash, session=session)
fail = False fail = False
r = 1 r = 1

View File

@ -1,6 +1,5 @@
# standard imports # standard imports
import logging import logging
import copy
# external imports # external imports
from hexathon import ( from hexathon import (
@ -95,24 +94,14 @@ def cache_tx_dict(tx_dict, session=None):
""" """
session = SessionBase.bind_session(session) session = SessionBase.bind_session(session)
ntx = copy.copy(tx_dict)
for k in [
'hash',
'from',
'to',
'source_token',
'destination_token',
]:
ntx[k] = add_0x(hex_uniform(strip_0x(ntx[k])))
txc = TxCache( txc = TxCache(
ntx['hash'], tx_dict['hash'],
ntx['from'], tx_dict['from'],
ntx['to'], tx_dict['to'],
ntx['source_token'], tx_dict['source_token'],
ntx['destination_token'], tx_dict['destination_token'],
ntx['from_value'], tx_dict['from_value'],
ntx['to_value'], tx_dict['to_value'],
session=session session=session
) )
@ -123,4 +112,4 @@ def cache_tx_dict(tx_dict, session=None):
SessionBase.release_session(session) SessionBase.release_session(session)
return (ntx, insert_id) return (tx_dict, insert_id)