Add session index tx query inteface

This commit is contained in:
nolash 2021-08-26 17:12:37 +02:00
parent 9d78dd055d
commit 835593acc5
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 167 additions and 5 deletions

100
chaind/runnable/list.py Normal file
View File

@ -0,0 +1,100 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import sys
# external imports
from hexathon import add_0x
from chaind import Environment
import chainlib.cli
from chainlib.chain import ChainSpec
from chainqueue.db import dsn_from_config
from chainqueue.sql.backend import SQLBackend
from chaind.sql.session import SessionIndex
from chainqueue.adapters.sessionindex import SessionIndexAdapter
from chainqueue.cli import Outputter
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'data', 'config')
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results')
argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results')
argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state')
argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--summary', action='store_true', help='output summary for each status category')
argparser.add_argument('--address', dest='address', type=str, help='filter by address')
argparser.add_positional('session_id', type=str, help='Ethereum address of recipient')
args = argparser.parse_args()
extra_args = {
'address': None,
'backend': None,
'start': None,
'end': None,
'error': None,
'pending': None,
'status_mask': None,
'summary': None,
'session_id': 'SESSION_ID',
}
env = Environment(domain='eth', env=os.environ)
config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
if config.get('SESSION_DATA_DIR') == None:
config.add(env.data_dir, 'SESSION_DATA_DIR', exists_ok=True)
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
status_mask = config.get('_STATUS_MASK', None)
not_status_mask = None
if status_mask == None:
if config.get('_ERROR'):
status_mask = all_errors()
if config.get('_PENDING'):
not_status_mask = StatusBits.FINAL
tx_getter = None
session_method = None
if config.get('_BACKEND') == 'sql':
from chainqueue.sql.query import get_tx_cache as tx_getter
from chainqueue.runnable.sql import setup_backend
from chainqueue.db.models.base import SessionBase
setup_backend(config, debug=config.true('DATABASE_DEBUG'))
session_method = SessionBase.create_session
else:
raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
if config.get('DATABASE_ENGINE') == 'sqlite':
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
dsn = dsn_from_config(config)
backend = SQLBackend(dsn, debug=config.true('DATABASE_DEBUG'))
session_index_backend = SessionIndex(config.get('SESSION_ID'))
adapter = SessionIndexAdapter(backend, session_index_backend=session_index_backend)
def main():
outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'))
txs = session_index_backend.get(chain_spec, adapter)
if config.get('_SUMMARY'):
for k in txs.keys():
outputter.add(k)
outputter.decode_summary()
else:
for k in txs.keys():
outputter.decode_single(k)
if __name__ == '__main__':
main()

View File

@ -1,6 +1,7 @@
# external imports
from chainqueue.sql.query import get_tx
class SessionIndex:
def __init__(self, session_id):
@ -13,9 +14,22 @@ class SessionIndex:
session.flush()
def get(self, chain_spec, adapter, session=None):
def get(self, chain_spec, adapter, session=None, status=None, not_status=0, status_target=None, before=None):
session = adapter.create_session(session=session)
otxs = session.execute("SELECT tx_hash, signed_tx FROM otx WHERE otx.id = ( SELECT otx_id FROM session where session='{}')".format(self.id))
sql = "SELECT tx_hash, signed_tx FROM otx INNER JOIN tx_cache ON otx.id = tx_cache.otx_id WHERE otx.id IN ( SELECT otx_id FROM session where session='{}')".format(self.id)
if status != None:
if status_target == 0:
sql += " AND status & {} > 0".format(status)
else:
if status_target == None:
status_target = status
sql += " AND status & {} = {}".format(status, status_target)
if not_status > 0:
sql += " AND status & {} = 0".format(not_status)
if before != None:
sql += " AND tx_cache.date_checked < '{}'".format(before.isoformat())
otxs = session.execute(sql)
txs = {}
for otx in otxs:
txs[otx[0]] = otx[1]

View File

@ -0,0 +1,41 @@
# standard imports
import datetime
# external imports
from hexathon import add_0x
from chainqueue.adapters.base import Adapter
from chainqueue.enum import StatusBits
class SessionIndexAdapter(Adapter):
def __init__(self, backend, session_index_backend=None, pending_retry_threshold=0, error_retry_threshold=0):
super(SessionIndexAdapter, self).__init__(backend, pending_retry_threshold=pending_retry_threshold, error_retry_threshold=error_retry_threshold)
self.session_index_backend = session_index_backend
def add(self, bytecode, chain_spec, session=None):
tx = self.translate(bytecode, chain_spec)
r = self.backend.create(chain_spec, tx['nonce'], tx['from'], tx['hash'], add_0x(bytecode.hex()), session=session)
if r:
session.rollback()
session.close()
return None
r = self.backend.cache(tx, session=session)
if self.session_index_backend != None:
session.flush()
self.session_index_backend.add(chain_spec, tx['hash'], session=session)
session.commit()
return tx['hash']
def upcoming(self, chain_spec, session=None):
txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK)
before = datetime.datetime.utcnow() - self.error_retry_threshold
errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True)
for tx_hash in errored_txs.keys():
txs[tx_hash] = errored_txs[tx_hash]
return txs

View File

@ -1,5 +1,5 @@
chainlib>=0.0.9a2,<=0.1.0
chainqueue>=0.0.4a4,<=0.0.4
chainlib>=0.0.9a3,<=0.1.0
chainqueue>=0.0.4a5,<=0.0.4
chainsyncer>=0.0.6a1,<=0.0.6
confini>=0.4.1a1,<0.5.0
crypto-dev-signer>=0.4.15a1,<0.5.0

View File

@ -1,6 +1,6 @@
[metadata]
name = chaind
version = 0.0.2a4
version = 0.0.3a1
description = Base package for chain queue services
author = Louis Holbrook
author_email = dev@holbrook.no
@ -27,3 +27,10 @@ python_requires = >= 3.6
include_package_data = True
packages =
chaind
chaind.sql
chaind.runnable
chainqueue.adapters
[options.entry_points]
console_scripts =
chaind-list = chaind.runnable.list:main