Compare commits

...

21 Commits

Author SHA1 Message Date
lash
5cd52cbb97 Change license to APL3, waive copyright 2022-11-14 07:29:18 +00:00
lash
1b24fd5439 Implement shep 0.3.x, chainlib 0.4.x 2022-11-06 15:17:55 +00:00
lash
b7a84acdb4 Bump minor version 2022-05-14 16:29:02 +00:00
lash
b7953cbd0f Add data module 2022-05-14 12:38:10 +00:00
lash
c5bd4aad3a Update cli module for chainlib 0.3.0, fix remaining settings processing 2022-05-13 13:46:30 +00:00
lash
c45e6e3310 Implement settings on chainlib 0.3.0 structure 2022-05-13 09:29:54 +00:00
lash
97fbec6477 Rehabilitate tests after by state signature change 2022-05-09 19:40:41 +00:00
lash
2a9bf41cf5 Deduplicate multiple matches in by-state list 2022-05-06 14:00:05 +00:00
lash
c54661d39e Add prerelease version 2022-05-06 08:47:51 +00:00
lash
ddeae91611 Add state parser cli tool 2022-05-06 08:47:29 +00:00
lash
d94cf08719 Rehabilitate list tool, enhance settings and config 2022-05-06 08:30:14 +00:00
lash
a0f8960643 Update changelog 2022-05-05 17:06:30 +00:00
lash
ce0f29d982 Upgrade shep, omit sync on persist set 2022-05-05 17:03:19 +00:00
lash
263d4df300 Update changelog 2022-05-05 15:42:08 +00:00
lash
029deead75 Upgrade shep to handle exception in filestore list 2022-05-05 15:37:19 +00:00
lash
da9fb5925d Upgrade shep 2022-05-05 15:11:01 +00:00
lash
cbf00281c6 Remove sync for each get 2022-05-05 14:22:21 +00:00
lash
01ad409077 Raise correct error in index store exists check 2022-05-04 18:37:02 +00:00
lash
3a8ec01588 Allow for sync skip in queue store instantiation 2022-05-04 05:44:47 +00:00
lash
b63793fd9b Add purge to chainqueue store object 2022-05-02 20:21:51 +00:00
lash
84b8eb10e6 Remove spam logline 2022-05-01 07:40:32 +00:00
20 changed files with 360 additions and 264 deletions

View File

@@ -1,3 +1,35 @@
- 0.2.2
* Change license to AGPL3 and copyright waived to public domain
- 0.2.1
* Implement shep 0.3.0
* Implement chainlib 0.4.x
- 0.2.0
* Implement chainlib 0.3.0
- 0.1.16
* Queue list cli tool
* State parser cli tool
* Provide pluggable renderer capability for queue list cli tool
* Move path and state query parsing to settings module
* Add queue path and digest parameters to base config
- 0.1.15
* Upgrade shep to avoid sync in persist set
- 0.1.14
* Upgrade shep to handle exception in filestore list
- 0.1.13
* Remove sync on each get
* Upgrade shep to guarantee atomic state lock state
- 0.1.12
* Raise correct exception from index store exists check
- 0.1.11
* Allow for sync skip in store instantiation
- 0.1.10
* Improve logging
- 0.1.9
* Upgrade deps
- 0.1.8
* Upgrade deps
- 0.1.7
* Improve logging
- 0.1.6
* Sort upcoming queue item chronologically
* Add unit testing for upcoming query method

View File

@@ -1 +1 @@
include *requirements.txt LICENSE chainqueue/db/migrations/default/* chainqueue/db/migrations/default/versions/* chainqueue/db/migrations/default/versions/src/* chainqueue/data/config/*
include *requirements.txt LICENSE WAIVER WAIVER.asc CHANGELOG chainqueue/db/migrations/default/* chainqueue/db/migrations/default/versions/* chainqueue/db/migrations/default/versions/src/* chainqueue/data/config/*

17
WAIVER Normal file
View File

@@ -0,0 +1,17 @@
# Copyright waiver for the python package "chainqueue"
I dedicate any and all copyright interest in this software to the
public domain. I make this dedication for the benefit of the public at
large and to the detriment of my heirs and successors. I intend this
dedication to be an overt act of relinquishment in perpetuity of all
present and future rights to this software under copyright law.
To the best of my knowledge and belief, my contributions are either
originally authored by me or are derived from prior works which I have
verified are also in the public domain and are not subject to claims
of copyright by other parties.
To the best of my knowledge and belief, no individual, business,
organization, government, or other entity has any copyright interest
in my contributions, and I affirm that I will not make contributions
that are otherwise encumbered.

29
WAIVER.asc Normal file
View File

@@ -0,0 +1,29 @@
-----BEGIN PGP MESSAGE-----
owGVU3tQFHUcvztPxAW1KZNp4vETfJUXaCgRoiQIeGkEiIBOJrt7v7v7cXe7yz64
LhDGfPAoKmi0A8dHmRQlA4xnIeTENT2YkhzIkEc1XlHkCIeTSo+5iX67B2r1V3/c
zO3+vt/P87evzpulClQv7hvtrPz47TR18yyeCsjbqM9NzaaLxj+KAiks5+CRySwC
O4mKIQ+MLA9EMwScQzSzDOBI2kKaIIikzSRiiiQowUiC0AMDNCCaFCEgGQf+GQBp
tQL6NhhiRMhDQf6D0ZAABNYo2kkeApGV4QlOoqyIBgbWhmGjgR7YSAv0j05DI8w+
I4aCDDQiEbBGvzb/MikSVpI3QYXfj4uXRR7ZIKPM2hzADBEvKAOCRNNQEFhekOlk
gfIWJiTuIsQolIwHWJyFCEhaweGhVfaOBLOCjD1xkOegKCHRIZ9j7wSH7cqHMpVR
EiVsVYlC8Cu7OwKJMeCg74RlJe3RBJHDTlsVZrRbGNZuhYZpgxQWAY06+YBmGeyS
kmTJ2ByGhAjv8gSLARGD5eBOJNwfD/GeA9ggwEHKc5gYt4wV8qwNcDzCr+0sbxGA
3YxoM87FTBZDAntHRoTH5BXSKrD+Gm8H72/NXzueYFgRp0sVQpwWNktbSWQTCOzh
jkUsgpUV4vvEiwgK/8MvI7MbUDEySKRVByhJQAzuUYfNmkgGPa8UpwMmuTFG7kcn
m/Wz4Se5IjMpKPf0v/eTwDb+HahOodcD0mhEvA2LJEX8ZEf4gstOlYv6jwVCGZGT
UFjtSMCFMLRkozCHIZqo0sRqVepAVdiixdrAZeNvcI+EV3z2Q0XWzFc5WyN/iypi
7j0zb5JjFkxtzw3Ze2SFIdU192jctk7fS5u7J7qbvloaVRHS8XXt2Cde32+Tq4eu
tpz0ZI6csJwrG6gY/X54cLT0/l+butJ8aOLyqZjBd9Z+9+k6b15NeOm16JWxwZ09
Ne9rD6lfpi62ORtLhOX1I5q2xvFvvnhlfuLCa+VrHo59ZkDzRKj7mHt9YnN2lfYM
ashqWpLyQHlvxvzE9qSmi5k7+9eeKtBcab1x4nq8yRtZPRaz0FVTsOWpnHlPX1ab
g1Z9uyW2uC3GVUVpbzyaUP5LfVJyxINHJnoGrfp2XWj2a40FtYdu9oTteiG10t36
Yq73w9VLb1aMhs1putosUYf7un5ekORtpPdtvbD1fF9HUG2n7UpR+gr1+fVs1s7D
hVROiNsUc4kuOdDb8lCE1Du4YZORI/S7XXXtUZungj0NjZXHb3mmPIk/Hs3fVVA2
4HTOOTu748uGeovxOjdS9+Zf5fUB3SX3habmfw4SawMdnu4z1X0qX+dbf9hsaVmu
jncDjxlTTqdb9jjiziE3uTx+E1Aty3iv9INWV513X7hTuLUoklRFJE/sWDnslvqT
ug6M7n/8bMNkW0vc7xt0zu2PbQvYfVz75HA6CNpzYSiljA2wcoMZYxt/sqw6+OcO
Pj5p6PXM59Ytydib0H8pqNdXWVm9ZlKjPjl177O+BE9ATiGfuz+4KD3f2SflVZWn
nuaEvwE=
=lGEV
-----END PGP MESSAGE-----

View File

@@ -1,153 +0,0 @@
# standard imports
import logging
import enum
# external imports
from hexathon import add_0x
# local imports
from chainqueue.enum import (
StatusBits,
all_errors,
is_alive,
is_error_status,
status_str,
)
logg = logging.getLogger(__name__)
class OutputCol(enum.Enum):
chainspec = 0
hash = 1
statustext = 2
statuscode = 3
signedtx = 4
class Outputter:
"""Output helper for chainqueue cli listings tools.
:param chain_spec: Chain spec to use as getter context
:type chain_spec: chainlib.chain.ChainSpec
:param writer: Writer to write output to. Will automatically flush.
:type writer: Writer
:param getter: Transaction getter
:type getter: See chainqueue.sql.backend.get_otx
:param session_method: Backend session generator method
:type session_method: varies
:param decode_status: Print status bit details
:type decode_status: bool
"""
all_cols = [
OutputCol.chainspec,
OutputCol.hash,
OutputCol.signedtx,
OutputCol.statustext,
OutputCol.statuscode,
]
default_cols = [
OutputCol.chainspec,
OutputCol.hash,
OutputCol.statustext,
OutputCol.statuscode,
]
def __init__(self, chain_spec, writer, getter, session_method=None, decode_status=True, cols=None):
self.decode_status = decode_status
self.writer = writer
self.getter = getter
self.chain_spec = chain_spec
self.chain_spec_str = str(chain_spec)
self.session = None
if session_method != None:
self.session = session_method()
self.results = {
'pending_error': 0,
'final_error': 0,
'pending': 0,
'final': 0,
}
debug_col_name = []
if cols == None:
self.cols = Outputter.default_cols
else:
self.cols = []
for col in cols:
v = getattr(OutputCol, col)
self.cols.append(v)
for col in self.cols:
debug_col_name.append(col.name)
logg.debug('outputter initialized with cols: {}'.format(','.join(debug_col_name)))
def __del__(self):
if self.session != None:
self.session.close()
def add(self, tx_hash):
"""Retrieve a transaction by hash and add it for summary output generation.
:param tx_hash: Transaction hash
:type tx_hash: str
"""
tx = self.getter(self.chain_spec, tx_hash, session=self.session)
self.__add(tx)
def __add(self, tx):
category = None
if is_alive(tx['status_code']):
category = 'pending'
else:
category = 'final'
self.results[category] += 1
if is_error_status(tx['status_code']):
logg.debug('registered {} as {} with error'.format(tx['tx_hash'], category))
self.results[category + '_error'] += 1
else:
logg.debug('registered {} as {}'.format(tx['tx_hash'], category))
def decode_summary(self):
"""Writes summary to the registered writer.
"""
self.writer.write('pending\t{}\t{}\n'.format(self.results['pending'], self.results['pending_error']))
self.writer.write('final\t{}\t{}\n'.format(self.results['final'], self.results['final_error']))
self.writer.write('total\t{}\t{}\n'.format(self.results['final'] + self.results['pending'], self.results['final_error'] + self.results['pending_error']))
def decode_single(self, tx_hash):
"""Retrieves the transaction with the given hash and writes the details to the underlying writer.
Registers the transaction with the summary generator.
:param tx_hash: Transaction hash
:type tx_hash: str
"""
tx = self.getter(self.chain_spec, tx_hash, session=self.session)
self.__add(tx)
status = tx['status']
if self.decode_status:
status = status_str(tx['status_code'], bits_only=True)
vals = [
self.chain_spec_str,
add_0x(tx_hash),
status,
str(tx['status_code']),
add_0x(tx['signed_tx']),
]
i = 0
l = len(self.cols)
for col in self.cols:
self.writer.write(vals[col.value])
i += 1
if i == l:
self.writer.write('\n')
else:
self.writer.write('\t')

View File

@@ -1,11 +0,0 @@
# standard imports
import os
# local imports
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@@ -1,2 +1,8 @@
def process_flags(argparser, flags):
argparser.add_argument('--backend', type=str, help='Backend to use for state store')
def apply_flag(flag):
flag.add('queue')
return flag
def apply_arg(arg):
arg.add_long('tx-digest-size', 'queue', type=int, help='Size of transaction hash in bytes')
return arg

View File

@@ -2,6 +2,8 @@ def process_config(config, args, flags):
args_override = {}
args_override['QUEUE_BACKEND'] = getattr(args, 'backend')
args_override['TX_DIGEST_SIZE'] = getattr(args, 'tx_digest_size')
args_override['QUEUE_STATE_PATH'] = getattr(args, 'state_dir')
config.dict_override(args_override, 'local cli args')

View File

@@ -1,2 +1,8 @@
[queue]
backend = mem
state_path =
index_path =
counter_path =
[tx]
digest_size = 32

View File

@@ -1,7 +1,7 @@
# standard imports
import logging
# ecxternal imports
# external imports
from hexathon import (
add_0x,
strip_0x,
@@ -44,6 +44,12 @@ class QueueEntry:
return tx_hash
def local_state(self):
state = self.store.state(self.k)
state_str = self.store.name(state)
return (state, state_str,)
def load(self):
(self.k, self.signed_tx) = self.store.get(self.tx_hash)
self.synced = True
@@ -142,4 +148,4 @@ class QueueEntry:
v = self.store.get(self.tx_hash)
n = self.store.state(v[0])
s = self.store.name(n)
return '{}: {}'.format(self.tx_hash, s)
return '{}: {} ({})'.format(self.k, s, n)

View File

@@ -4,15 +4,19 @@
import os
import logging
import sys
import importlib
# external imports
from hexathon import add_0x
import chainlib.cli
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
# local imports
from chainqueue.cli.output import Outputter
import chainqueue.cli
#from chainqueue.cli.output import Outputter
from chainqueue.settings import ChainqueueSettings
from chainqueue.store import Store
from chainqueue.entry import QueueEntry
logging.basicConfig(level=logging.WARNING)
@@ -21,30 +25,31 @@ logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'data', 'config')
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.UNSAFE
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results')
argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results')
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use')
argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state')
argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--summary', action='store_true', help='output summary for each status category')
argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display')
argparser.add_positional('address', type=str, help='Ethereum address of recipient')
argparser.add_argument('--no-final', action='store_true', dest='no_final', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=str, dest='status_mask', action='append', default=[], help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--exact', action='store_true', help='Match status exact')
argparser.add_argument('--include-pending', action='store_true', dest='include_pending', help='Include transactions in unprocessed state (pending)')
argparser.add_argument('--renderer', type=str, default=[], action='append', help='Transaction renderer for output')
argparser.add_positional('address', required=False, type=str, help='Ethereum address of recipient')
args = argparser.parse_args()
extra_args = {
'address': None,
'backend': None,
'start': None,
'end': None,
'state_dir': None,
'exact': None,
'error': None,
'pending': None,
'include_pending': '_PENDING',
'status_mask': None,
'column': None,
'summary': None,
'no_final': None,
'renderer': None,
}
config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
config = chainqueue.cli.config.process_config(config, args, 0)
logg.debug('config loaded:\n{}'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
@@ -58,36 +63,40 @@ if status_mask == None:
tx_getter = None
tx_lister = None
session_method = None
if config.get('_BACKEND') == 'sql':
from chainqueue.sql.query import get_account_tx as tx_lister
from chainqueue.sql.query import get_tx_cache as tx_getter
from chainqueue.runnable.sql import setup_backend
from chainqueue.db.models.base import SessionBase
setup_backend(config, debug=config.true('DATABASE_DEBUG'))
session_method = SessionBase.create_session
else:
raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
#output_cols = config.get('_COLUMN')
output_cols = config.get('_COLUMN')
renderers_mods = []
for renderer in config.get('_RENDERER'):
m = importlib.import_module(renderer)
renderers_mods.append(m)
logg.info('using renderer module {}'.format(renderer))
settings = ChainqueueSettings()
settings.process(config)
logg.debug('settings:\n{}'.format(settings))
def main():
since = config.get('_START', None)
if since != None:
since = add_0x(since)
until = config.get('_END', None)
if until != None:
until = add_0x(until)
txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'), cols=output_cols)
if config.get('_SUMMARY'):
for k in txs.keys():
outputter.add(k)
outputter.decode_summary()
else:
for k in txs.keys():
outputter.decode_single(k)
# since = config.get('_START', None)
# if since != None:
# since = add_0x(since)
# until = config.get('_END', None)
# if until != None:
# until = add_0x(until)
# txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
txs = settings.get('QUEUE_STORE').by_state(state=settings.get('QUEUE_STATUS_FILTER'), strict=config.get('_EXACT'), include_pending=config.get('_PENDING'))
for i, tx_hash in enumerate(txs):
entry = QueueEntry(settings.get('QUEUE_STORE'), tx_hash)
entry.load()
v = None
if len(renderers_mods) == 0:
v = str(entry)
else:
for m in renderers_mods:
v = m.apply(i, settings, v, settings.get('CHAIN_SPEC'), entry)
print(v)
if __name__ == '__main__':
main()

View File

@@ -1,14 +0,0 @@
# standard imports
import logging
# local imports
from chainqueue.db.models.base import SessionBase
from chainqueue.db import dsn_from_config
logg = logging.getLogger(__name__)
def setup_backend(config, debug=False):
dsn = dsn_from_config(config)
logg.debug('dsn {}'.format(dsn))
SessionBase.connect(dsn, debug=debug)

View File

@@ -0,0 +1,51 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import argparse
# local imports
from chainqueue.state import Status
argparser = argparse.ArgumentParser()
argparser.add_argument('-r', '--raw', dest='r', action='store_true', help='Always print pure state element strings')
argparser.add_argument('state', type=str, help='State to interpret')
args = argparser.parse_args()
status_interpreter = Status(None, allow_invalid=True)
def handle_numeric(v, elements=False):
if elements:
if not status_interpreter.is_pure(v):
return status_interpreter.elements(v)
return status_interpreter.name(v)
def handle_string(v):
try:
return status_interpreter.from_name(v)
except AttributeError:
return status_interpreter.from_elements(v)
def main():
v = None
numeric = False
try:
v = int(args.state)
numeric = True
except:
v = args.state
r = None
if numeric:
r = handle_numeric(v, elements=args.r)
else:
r = handle_string(v)
print(r)
if __name__ == '__main__':
main()

View File

@@ -1,8 +1,99 @@
# standard imports
import os
import logging
# external imports
from chainlib.settings import ChainSettings
from chainqueue.state import Status
from chainqueue.store import Store
logg = logging.getLogger(__name__)
class ChainqueueSettings(ChainSettings):
def process_queue_tx(settings, config):
settings.set('TX_DIGEST_SIZE', config.get('TX_DIGEST_SIZE'))
return settings
def process_queue_backend(self, config):
self.o['QUEUE_BACKEND'] = config.get('QUEUE_BACKEND')
def process_queue_store(settings, config):
status = Status(settings.get('QUEUE_STORE_FACTORY'), allow_invalid=True)
settings.set('QUEUE_STATE_STORE', status)
store = Store(
settings.get('CHAIN_SPEC'),
settings.get('QUEUE_STATE_STORE'),
settings.get('QUEUE_INDEX_STORE'),
settings.get('QUEUE_COUNTER_STORE'),
sync=True,
)
settings.set('QUEUE_STORE', store)
return settings
def process_queue_paths(settings, config):
index_dir = config.get('QUEUE_INDEX_PATH')
if index_dir == None:
index_dir = os.path.join(config.get('STATE_PATH'), 'tx')
counter_dir = config.get('QUEUE_COUNTER_PATH')
if counter_dir == None:
counter_dir = os.path.join(config.get('STATE_PATH'))
settings.set('QUEUE_STATE_PATH', config.get('STATE_PATH'))
settings.set('QUEUE_INDEX_PATH', index_dir)
settings.set('QUEUE_COUNTER_PATH', counter_dir)
return settings
def process_queue_backend_fs(settings, config):
from chainqueue.store.fs import IndexStore
from chainqueue.store.fs import CounterStore
from shep.store.file import SimpleFileStoreFactory
index_store = IndexStore(settings.o['QUEUE_INDEX_PATH'], digest_bytes=int(settings.o['TX_DIGEST_SIZE']))
counter_store = CounterStore(settings.o['QUEUE_COUNTER_PATH'])
factory = SimpleFileStoreFactory(settings.o['QUEUE_STATE_PATH'], use_lock=True).add
settings.set('QUEUE_INDEX_STORE', index_store)
settings.set('QUEUE_COUNTER_STORE', counter_store)
settings.set('QUEUE_STORE_FACTORY', factory)
return settings
def process_queue_status_filter(settings, config):
states = 0
store = settings.get('QUEUE_STATE_STORE')
if len(config.get('_STATUS_MASK')) == 0:
for v in store.all(numeric=True):
states |= v
logg.debug('state store {}'.format(states))
else:
for v in config.get('_STATUS_MASK'):
try:
states |= int(v)
continue
except ValueError:
pass
state = store.from_name(v)
logg.debug('resolved state argument {} to numeric state {}'.format(v, state))
states |= state
settings.set('QUEUE_STATUS_FILTER', states)
return settings
def process_queue(settings, config):
settings = process_queue_tx(settings, config)
settings = process_queue_paths(settings, config)
if config.get('QUEUE_BACKEND') == 'fs':
settings = process_queue_backend_fs(settings, config)
settings = process_queue_backend(settings, config)
settings = process_queue_store(settings, config)
settings = process_queue_status_filter(settings, config)
return settings
def process_settings(settings, config):
super(ChainqueueSettings, settings).process(config)
settings = settings.process_queue(settings, config)
return settings

View File

@@ -9,7 +9,7 @@ logg = logging.getLogger(__name__)
class Verify:
def verify(self, state_store, from_state, to_state):
def verify(self, state_store, key, from_state, to_state):
to_state_name = state_store.name(to_state)
m = None
try:
@@ -105,11 +105,13 @@ class Verify:
class Status(shep.persist.PersistedState):
bits = 12
def __init__(self, store_factory, allow_invalid=False, event_callback=None):
verify = Verify().verify
self.set_default_state('PENDING')
super(Status, self).__init__(store_factory, 12, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback)
super(Status, self).__init__(store_factory, self.bits, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback)
self.add('QUEUED')
self.add('RESERVED')
self.add('IN_NETWORK')

View File

@@ -29,7 +29,7 @@ all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, chain_spec, state_store, index_store, counter, cache=None):
def __init__(self, chain_spec, state_store, index_store, counter, cache=None, sync=True):
self.chain_spec = chain_spec
self.cache = cache
self.state_store = state_store
@@ -47,9 +47,13 @@ class Store:
'unset',
'name',
'modified',
'purge',
]:
setattr(self, v, getattr(self.state_store, v))
if not sync:
return
sync_err = None
try:
self.state_store.sync()
@@ -79,7 +83,6 @@ class Store:
s = self.index_store.get(k)
err = None
try:
self.state_store.sync()
v = self.state_store.get(s)
except FileNotFoundError as e:
err = e
@@ -88,13 +91,25 @@ class Store:
return (s, v,)
def by_state(self, state=0, not_state=0, limit=4096, strict=False, threshold=None):
def by_state(self, state=0, not_state=0, include_pending=False, limit=4096, strict=False, threshold=None):
hashes = []
i = 0
refs_state = []
if state > 0:
if self.state_store.is_pure(state):
refs_state = self.state_store.list(state)
elif strict:
refs_state = self.state_store.list(state)
else:
for v in self.state_store.elements(state, numeric=True):
refs_state += self.state_store.list(v)
refs_state = list(set(refs_state))
if include_pending:
refs_state += self.state_store.list(0)
refs_state = self.state_store.list(state)
refs_state.sort()
for ref in refs_state:
v = from_key(ref)
hsh = v[2]
@@ -105,10 +120,12 @@ class Store:
if item_state & state != item_state:
continue
logg.info('state {} {}'.format(ref, item_state))
if item_state & not_state > 0:
continue
item_state_str = self.state_store.name(item_state)
logg.info('state {} {} ({})'.format(ref, item_state_str, item_state))
if threshold != None:
v = self.state_store.modified(ref)
if v > threshold:
@@ -120,7 +137,7 @@ class Store:
if limit > 0 and i == limit:
break
hashes.sort()
#hashes.sort()
return hashes
@@ -144,7 +161,7 @@ class Store:
def pending(self, limit=4096):
return self.by_state(state=0, limit=limit, strict=True)
return self.by_state(include_pending=True, limit=limit, strict=True)
def reserve(self, k):

View File

@@ -6,7 +6,10 @@ import logging
from leveldir.hex import HexDir
# local imports
from chainqueue.error import DuplicateTxError
from chainqueue.error import (
DuplicateTxError,
NotLocalTxError,
)
logg = logging.getLogger(__name__)
@@ -22,7 +25,7 @@ class IndexStore(HexDir):
existing = None
try:
existing = self.get(k)
except FileNotFoundError:
except NotLocalTxError:
pass
return existing != None
@@ -37,7 +40,14 @@ class IndexStore(HexDir):
def get(self, k):
fp = self.store.to_filepath(k)
f = open(fp, 'rb')
f = None
err = None
try:
f = open(fp, 'rb')
except FileNotFoundError as e:
err = e
if err != None:
raise NotLocalTxError(err)
v = f.read()
f.close()
return v.decode('utf-8')
@@ -64,7 +74,7 @@ class CounterStore:
v = f.read(8)
self.count = int.from_bytes(v, byteorder='big')
logg.info('counter starts at {}'.format(self.count))
logg.debug('counter starts at {}'.format(self.count))
f.seek(0)

View File

@@ -1,9 +1,5 @@
#pysha3==1.0.2
hexathon~=0.1.5
hexathon~=0.1.7
leveldir~=0.3.0
#alembic==1.4.2
#SQLAlchemy==1.3.20
confini~=0.6.0
#pyxdg~=0.27
chainlib~=0.1.1
shep~=0.2.3
confini~=0.6.1
chainlib~=0.4.0
shep~=0.3.0

View File

@@ -1,16 +1,14 @@
[metadata]
name = chainqueue
version = 0.1.7
version = 0.2.2
description = Generic blockchain transaction queue control
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/chaintool/chainqueue
url = https://git.defalslfy.org/chainqueue,git
keywords =
cic
dlt
cryptocurrency
ethereum
solidarity
mutual_credit
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
@@ -20,9 +18,9 @@ classifiers =
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
# Topic :: Blockchain :: EVM
license = GPL3
license = OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)
licence_files =
LICENSE.txt
LICENSE
[options]
python_requires = >= 3.7
@@ -34,7 +32,9 @@ packages =
chainqueue.store
chainqueue.runnable
chainqueue.cli
chainqueue.data
#[options.entry_points]
#console_scripts =
# chainqueue-list = chainqueue.runnable.list:main
[options.entry_points]
console_scripts =
chainqueue-list = chainqueue.runnable.list:main
chainqueue-state = chainqueue.runnable.state:main

View File

@@ -32,7 +32,7 @@ class TestEntry(TestShepBase):
entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash_two = entry.create(signed_tx)
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 2)
logg.debug('tx hash one {}'.format(tx_hash_one))
@@ -40,14 +40,14 @@ class TestEntry(TestShepBase):
entry.load()
entry.sent()
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 1)
entry.succeed(None, None)
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
entry = QueueEntry(self.store, tx_hash_two)
@@ -78,7 +78,7 @@ class TestEntry(TestShepBase):
entry = QueueEntry(self.store, tx_hash, cache_adapter=MockCacheTokenTx)
entry.load()
self.assertEqual(str(entry), tx_hash + ': SENDFAIL')
self.assertEqual(str(entry.tx_hash), tx_hash)
if __name__ == '__main__':