diff --git a/CHANGELOG b/CHANGELOG index 1295334..4c56aa6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +- 0.1.16 + * Queue list cli tool + * Provide pluggable renderer capability for queue list cli tool + * Move path and state query parsing to settings module + * Add queue path and digest parameters to base config - 0.1.15 * Upgrade shep to avoid sync in persist set - 0.1.14 diff --git a/chainqueue/cli/config.py b/chainqueue/cli/config.py index 9df7dc5..80508e5 100644 --- a/chainqueue/cli/config.py +++ b/chainqueue/cli/config.py @@ -2,6 +2,8 @@ def process_config(config, args, flags): args_override = {} args_override['QUEUE_BACKEND'] = getattr(args, 'backend') + args_override['TX_DIGEST_SIZE'] = getattr(args, 'tx_digest_size') + args_override['QUEUE_STATE_PATH'] = getattr(args, 'state_dir') config.dict_override(args_override, 'local cli args') diff --git a/chainqueue/data/config/config.ini b/chainqueue/data/config/config.ini index 1458016..ec08f0a 100644 --- a/chainqueue/data/config/config.ini +++ b/chainqueue/data/config/config.ini @@ -1,2 +1,8 @@ [queue] backend = mem +state_path = +index_path = +counter_path = + +[tx] +digest_size = 32 diff --git a/chainqueue/entry.py b/chainqueue/entry.py index 8150e87..271eaff 100644 --- a/chainqueue/entry.py +++ b/chainqueue/entry.py @@ -1,7 +1,7 @@ # standard imports import logging -# ecxternal imports +# external imports from hexathon import ( add_0x, strip_0x, @@ -44,6 +44,12 @@ class QueueEntry: return tx_hash + def local_state(self): + state = self.store.state(self.k) + state_str = self.store.name(state) + return (state, state_str,) + + def load(self): (self.k, self.signed_tx) = self.store.get(self.tx_hash) self.synced = True @@ -142,4 +148,4 @@ class QueueEntry: v = self.store.get(self.tx_hash) n = self.store.state(v[0]) s = self.store.name(n) - return '{}: {}'.format(self.tx_hash, s) + return '{}: {} ({})'.format(self.k, s, n) diff --git a/chainqueue/runnable/list.py b/chainqueue/runnable/list.py index 68fe3b9..58cf450 100644 --- a/chainqueue/runnable/list.py +++ b/chainqueue/runnable/list.py @@ -4,15 +4,19 @@ import os import logging import sys +import importlib # external imports from hexathon import add_0x import chainlib.cli from chainlib.chain import ChainSpec -from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer # local imports -from chainqueue.cli.output import Outputter +import chainqueue.cli +#from chainqueue.cli.output import Outputter +from chainqueue.settings import ChainqueueSettings +from chainqueue.store import Store +from chainqueue.entry import QueueEntry logging.basicConfig(level=logging.WARNING) @@ -21,30 +25,43 @@ logg = logging.getLogger() script_dir = os.path.dirname(os.path.realpath(__file__)) config_dir = os.path.join(script_dir, '..', 'data', 'config') -arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC +arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.UNSAFE argparser = chainlib.cli.ArgumentParser(arg_flags) -argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")') -argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results') -argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results') +argparser.add_argument('--backend', type=str, default='sql', help='Backend to use') +argparser.add_argument('--state-dir', type=str, dest='state_dir', help='Backend to use') +argparser.add_argument('--tx-digest-size', type=int, dest='tx_digest_size', default=32, help='Size of tx digest in bytes') +#argparser.add_argument('--session-id', type=str, dest='session_id', help='Session id to list') +#argparser.add_argument('--start', type=str, help='Oldest transaction to include in results') +#argparser.add_argument('--end', type=str, help='Newest transaction to include in results') argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state') -argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions') -argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)') -argparser.add_argument('--summary', action='store_true', help='output summary for each status category') -argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display') +argparser.add_argument('--no-final', action='store_true', dest='no_final', help='Omit finalized transactions') +argparser.add_argument('--status-mask', type=str, dest='status_mask', action='append', default=[], help='Manually specify status bitmask value to match (overrides --error and --pending)') +argparser.add_argument('--exact', action='store_true', help='Match status exact') +argparser.add_argument('--include-pending', action='store_true', dest='include_pending', help='Include transactions in unprocessed state (pending)') +argparser.add_argument('--renderer', type=str, default=[], action='append', help='Transaction renderer for output') +#argparser.add_argument('--summary', action='store_true', help='output summary for each status category') +#argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display') argparser.add_positional('address', type=str, help='Ethereum address of recipient') args = argparser.parse_args() extra_args = { 'address': None, 'backend': None, - 'start': None, - 'end': None, + 'state_dir': None, + 'exact': None, +# 'tx_digest_size': None, +# 'start': None, +# 'end': None, 'error': None, - 'pending': None, + 'include_pending': '_PENDING', 'status_mask': None, - 'column': None, - 'summary': None, + 'no_final': None, + 'renderer': None, +# 'column': None, +# 'summary': None, } config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) +config = chainqueue.cli.config.process_config(config, args, 0) +logg.debug('config loaded:\n{}'.format(config)) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) @@ -58,36 +75,40 @@ if status_mask == None: tx_getter = None tx_lister = None -session_method = None -if config.get('_BACKEND') == 'sql': - from chainqueue.sql.query import get_account_tx as tx_lister - from chainqueue.sql.query import get_tx_cache as tx_getter - from chainqueue.runnable.sql import setup_backend - from chainqueue.db.models.base import SessionBase - setup_backend(config, debug=config.true('DATABASE_DEBUG')) - session_method = SessionBase.create_session -else: - raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND'))) +#output_cols = config.get('_COLUMN') -output_cols = config.get('_COLUMN') +renderers_mods = [] +for renderer in config.get('_RENDERER'): + m = importlib.import_module(renderer) + renderers_mods.append(m) + logg.info('using renderer module {}'.format(renderer)) + +settings = ChainqueueSettings() +settings.process(config) +logg.debug('settings:\n{}'.format(settings)) def main(): - since = config.get('_START', None) - if since != None: - since = add_0x(since) - until = config.get('_END', None) - if until != None: - until = add_0x(until) - txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask) - outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'), cols=output_cols) - if config.get('_SUMMARY'): - for k in txs.keys(): - outputter.add(k) - outputter.decode_summary() - else: - for k in txs.keys(): - outputter.decode_single(k) +# since = config.get('_START', None) +# if since != None: +# since = add_0x(since) +# until = config.get('_END', None) +# if until != None: +# until = add_0x(until) +# txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask) + txs = settings.get('QUEUE_STORE').by_state(state=settings.get('QUEUE_STATUS_FILTER'), strict=config.get('_EXACT'), include_pending=config.get('_PENDING')) + + for i, tx_hash in enumerate(txs): + entry = QueueEntry(settings.get('QUEUE_STORE'), tx_hash) + entry.load() + v = None + if len(renderers_mods) == 0: + v = str(entry) + else: + for m in renderers_mods: + v = m.apply(i, settings, v, settings.get('CHAIN_SPEC'), entry) + print(v) + if __name__ == '__main__': main() diff --git a/chainqueue/settings.py b/chainqueue/settings.py index e048a26..eaeb21a 100644 --- a/chainqueue/settings.py +++ b/chainqueue/settings.py @@ -1,8 +1,86 @@ +# standard imports +import os +import logging + # external imports from chainlib.settings import ChainSettings +from chainqueue.state import Status +from chainqueue.store import Store + +logg = logging.getLogger(__name__) class ChainqueueSettings(ChainSettings): + def process_queue_tx(self, config): + self.o['TX_DIGEST_SIZE'] = config.get('TX_DIGEST_SIZE') + + def process_queue_backend(self, config): self.o['QUEUE_BACKEND'] = config.get('QUEUE_BACKEND') + self.o['QUEUE_STATE_STORE'] = Status(self.o['QUEUE_STORE_FACTORY'], allow_invalid=True) + self.o['QUEUE_STORE'] = Store( + self.o['CHAIN_SPEC'], + self.o['QUEUE_STATE_STORE'], + self.o['QUEUE_INDEX_STORE'], + self.o['QUEUE_COUNTER_STORE'], + sync=True, + ) + + + def process_queue_paths(self, config): + index_dir = config.get('QUEUE_INDEX_PATH') + if index_dir == None: + index_dir = os.path.join(config.get('QUEUE_STATE_PATH'), 'tx') + + counter_dir = config.get('QUEUE_COUNTER_PATH') + if counter_dir == None: + counter_dir = os.path.join(config.get('QUEUE_STATE_PATH')) + + self.o['QUEUE_STATE_PATH'] = config.get('QUEUE_STATE_PATH') + self.o['QUEUE_INDEX_PATH'] = index_dir + self.o['QUEUE_COUNTER_PATH'] = counter_dir + + + def process_queue_backend_fs(self, config): + from chainqueue.store.fs import IndexStore + from chainqueue.store.fs import CounterStore + from shep.store.file import SimpleFileStoreFactory + index_store = IndexStore(self.o['QUEUE_INDEX_PATH'], digest_bytes=self.o['TX_DIGEST_SIZE']) + counter_store = CounterStore(self.o['QUEUE_COUNTER_PATH']) + factory = SimpleFileStoreFactory(self.o['QUEUE_STATE_PATH'], use_lock=True).add + + self.o['QUEUE_INDEX_STORE'] = index_store + self.o['QUEUE_COUNTER_STORE'] = counter_store + self.o['QUEUE_STORE_FACTORY'] = factory + + + def process_queue_status_filter(self, config): + states = 0 + if len(config.get('_STATUS_MASK')) == 0: + for v in self.o['QUEUE_STATE_STORE'].all(numeric=True): + states |= v + logg.debug('state store {}'.format(states)) + else: + for v in config.get('_STATUS_MASK'): + try: + states |= int(v) + continue + except ValueError: + pass + + state = self.o['QUEUE_STATE_STORE'].from_name(v) + logg.debug('resolved state argument {} to numeric state {}'.format(v, state)) + states |= state + + self.o['QUEUE_STATUS_FILTER'] = states + + + def process(self, config): + super(ChainqueueSettings, self).process(config) + self.process_queue_tx(config) + self.process_queue_paths(config) + if config.get('_BACKEND') == 'fs': + self.process_queue_backend_fs(config) + self.process_queue_backend(config) + self.process_queue_status_filter(config) diff --git a/chainqueue/state.py b/chainqueue/state.py index d82805f..9e983a1 100644 --- a/chainqueue/state.py +++ b/chainqueue/state.py @@ -105,11 +105,13 @@ class Verify: class Status(shep.persist.PersistedState): + + bits = 12 def __init__(self, store_factory, allow_invalid=False, event_callback=None): verify = Verify().verify self.set_default_state('PENDING') - super(Status, self).__init__(store_factory, 12, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback) + super(Status, self).__init__(store_factory, self.bits, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback) self.add('QUEUED') self.add('RESERVED') self.add('IN_NETWORK') diff --git a/chainqueue/store/base.py b/chainqueue/store/base.py index 4b3b9a4..4ed8ee5 100644 --- a/chainqueue/store/base.py +++ b/chainqueue/store/base.py @@ -91,13 +91,24 @@ class Store: return (s, v,) - def by_state(self, state=0, not_state=0, limit=4096, strict=False, threshold=None): + def by_state(self, state=0, not_state=0, include_pending=False, limit=4096, strict=False, threshold=None): hashes = [] i = 0 + + refs_state = [] + if state > 0: + if self.state_store.is_pure(state): + refs_state = self.state_store.list(state) + elif strict: + refs_state = self.state_store.list(state) + else: + for v in self.state_store.elements(state, numeric=True): + refs_state += self.state_store.list(v) + if include_pending: + refs_state += self.state_store.list(0) - refs_state = self.state_store.list(state) refs_state.sort() - + for ref in refs_state: v = from_key(ref) hsh = v[2] @@ -125,7 +136,7 @@ class Store: if limit > 0 and i == limit: break - hashes.sort() + #hashes.sort() return hashes diff --git a/requirements.txt b/requirements.txt index c8e5ba6..4aeccb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,5 @@ leveldir~=0.3.0 #SQLAlchemy==1.3.20 confini~=0.6.0 #pyxdg~=0.27 -chainlib~=0.1.1 -shep~=0.2.8 +chainlib~=0.1.3 +shep~=0.2.9 diff --git a/setup.cfg b/setup.cfg index f2c7b81..c350d6f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainqueue -version = 0.1.15 +version = 0.1.16 description = Generic blockchain transaction queue control author = Louis Holbrook author_email = dev@holbrook.no