Compare commits

...

25 Commits

Author SHA1 Message Date
lash
b7a84acdb4
Bump minor version 2022-05-14 16:29:02 +00:00
lash
b7953cbd0f
Add data module 2022-05-14 12:38:10 +00:00
lash
c5bd4aad3a
Update cli module for chainlib 0.3.0, fix remaining settings processing 2022-05-13 13:46:30 +00:00
lash
c45e6e3310
Implement settings on chainlib 0.3.0 structure 2022-05-13 09:29:54 +00:00
lash
97fbec6477
Rehabilitate tests after by state signature change 2022-05-09 19:40:41 +00:00
lash
2a9bf41cf5
Deduplicate multiple matches in by-state list 2022-05-06 14:00:05 +00:00
lash
c54661d39e
Add prerelease version 2022-05-06 08:47:51 +00:00
lash
ddeae91611
Add state parser cli tool 2022-05-06 08:47:29 +00:00
lash
d94cf08719
Rehabilitate list tool, enhance settings and config 2022-05-06 08:30:14 +00:00
lash
a0f8960643
Update changelog 2022-05-05 17:06:30 +00:00
lash
ce0f29d982
Upgrade shep, omit sync on persist set 2022-05-05 17:03:19 +00:00
lash
263d4df300
Update changelog 2022-05-05 15:42:08 +00:00
lash
029deead75
Upgrade shep to handle exception in filestore list 2022-05-05 15:37:19 +00:00
lash
da9fb5925d
Upgrade shep 2022-05-05 15:11:01 +00:00
lash
cbf00281c6
Remove sync for each get 2022-05-05 14:22:21 +00:00
lash
01ad409077
Raise correct error in index store exists check 2022-05-04 18:37:02 +00:00
lash
3a8ec01588
Allow for sync skip in queue store instantiation 2022-05-04 05:44:47 +00:00
lash
b63793fd9b
Add purge to chainqueue store object 2022-05-02 20:21:51 +00:00
lash
84b8eb10e6
Remove spam logline 2022-05-01 07:40:32 +00:00
lash
532ff230b4
Remove race waits (defer to client layer) 2022-05-01 06:44:33 +00:00
lash
f7c09acfe2
Add race delay 2022-05-01 06:27:52 +00:00
lash
04d9901f0d
Allow backend objects to move between sync and get 2022-04-30 18:31:02 +00:00
lash
b8c2b1b86a
Sort statewise results 2022-04-30 16:43:55 +00:00
lash
c94b291d39
Add upcoming tests, event callback pass to shep 2022-04-30 05:42:44 +00:00
lash
6c360ca2e5
Add reserved check method 2022-04-29 06:28:01 +00:00
19 changed files with 413 additions and 271 deletions

View File

@ -1,3 +1,39 @@
- 0.2.0
* Implement chainlib 0.3.0
- 0.1.16
* Queue list cli tool
* State parser cli tool
* Provide pluggable renderer capability for queue list cli tool
* Move path and state query parsing to settings module
* Add queue path and digest parameters to base config
- 0.1.15
* Upgrade shep to avoid sync in persist set
- 0.1.14
* Upgrade shep to handle exception in filestore list
- 0.1.13
* Remove sync on each get
* Upgrade shep to guarantee atomic state lock state
- 0.1.12
* Raise correct exception from index store exists check
- 0.1.11
* Allow for sync skip in store instantiation
- 0.1.10
* Improve logging
- 0.1.9
* Upgrade deps
- 0.1.8
* Upgrade deps
- 0.1.7
* Improve logging
- 0.1.6
* Sort upcoming queue item chronologically
* Add unit testing for upcoming query method
- 0.1.5
* Add reserved state check method
- 0.1.4
* Dependency cleanups
- 0.1.3
* Add CLI args and config handling, settings object
- 0.1.2
* Add CLI inspection tools
- 0.1.1

View File

@ -1,153 +0,0 @@
# standard imports
import logging
import enum
# external imports
from hexathon import add_0x
# local imports
from chainqueue.enum import (
StatusBits,
all_errors,
is_alive,
is_error_status,
status_str,
)
logg = logging.getLogger(__name__)
class OutputCol(enum.Enum):
chainspec = 0
hash = 1
statustext = 2
statuscode = 3
signedtx = 4
class Outputter:
"""Output helper for chainqueue cli listings tools.
:param chain_spec: Chain spec to use as getter context
:type chain_spec: chainlib.chain.ChainSpec
:param writer: Writer to write output to. Will automatically flush.
:type writer: Writer
:param getter: Transaction getter
:type getter: See chainqueue.sql.backend.get_otx
:param session_method: Backend session generator method
:type session_method: varies
:param decode_status: Print status bit details
:type decode_status: bool
"""
all_cols = [
OutputCol.chainspec,
OutputCol.hash,
OutputCol.signedtx,
OutputCol.statustext,
OutputCol.statuscode,
]
default_cols = [
OutputCol.chainspec,
OutputCol.hash,
OutputCol.statustext,
OutputCol.statuscode,
]
def __init__(self, chain_spec, writer, getter, session_method=None, decode_status=True, cols=None):
self.decode_status = decode_status
self.writer = writer
self.getter = getter
self.chain_spec = chain_spec
self.chain_spec_str = str(chain_spec)
self.session = None
if session_method != None:
self.session = session_method()
self.results = {
'pending_error': 0,
'final_error': 0,
'pending': 0,
'final': 0,
}
debug_col_name = []
if cols == None:
self.cols = Outputter.default_cols
else:
self.cols = []
for col in cols:
v = getattr(OutputCol, col)
self.cols.append(v)
for col in self.cols:
debug_col_name.append(col.name)
logg.debug('outputter initialized with cols: {}'.format(','.join(debug_col_name)))
def __del__(self):
if self.session != None:
self.session.close()
def add(self, tx_hash):
"""Retrieve a transaction by hash and add it for summary output generation.
:param tx_hash: Transaction hash
:type tx_hash: str
"""
tx = self.getter(self.chain_spec, tx_hash, session=self.session)
self.__add(tx)
def __add(self, tx):
category = None
if is_alive(tx['status_code']):
category = 'pending'
else:
category = 'final'
self.results[category] += 1
if is_error_status(tx['status_code']):
logg.debug('registered {} as {} with error'.format(tx['tx_hash'], category))
self.results[category + '_error'] += 1
else:
logg.debug('registered {} as {}'.format(tx['tx_hash'], category))
def decode_summary(self):
"""Writes summary to the registered writer.
"""
self.writer.write('pending\t{}\t{}\n'.format(self.results['pending'], self.results['pending_error']))
self.writer.write('final\t{}\t{}\n'.format(self.results['final'], self.results['final_error']))
self.writer.write('total\t{}\t{}\n'.format(self.results['final'] + self.results['pending'], self.results['final_error'] + self.results['pending_error']))
def decode_single(self, tx_hash):
"""Retrieves the transaction with the given hash and writes the details to the underlying writer.
Registers the transaction with the summary generator.
:param tx_hash: Transaction hash
:type tx_hash: str
"""
tx = self.getter(self.chain_spec, tx_hash, session=self.session)
self.__add(tx)
status = tx['status']
if self.decode_status:
status = status_str(tx['status_code'], bits_only=True)
vals = [
self.chain_spec_str,
add_0x(tx_hash),
status,
str(tx['status_code']),
add_0x(tx['signed_tx']),
]
i = 0
l = len(self.cols)
for col in self.cols:
self.writer.write(vals[col.value])
i += 1
if i == l:
self.writer.write('\n')
else:
self.writer.write('\t')

View File

@ -1,11 +0,0 @@
# standard imports
import os
# local imports
from .arg import process_flags
from .config import process_config
__script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(os.path.dirname(__script_dir), 'data')
config_dir = os.path.join(data_dir, 'config')

View File

@ -1,2 +1,8 @@
def process_flags(argparser, flags):
argparser.add_argument('--backend', type=str, help='Backend to use for state store')
def apply_flag(flag):
flag.add('queue')
return flag
def apply_arg(arg):
arg.add_long('tx-digest-size', 'queue', type=int, help='Size of transaction hash in bytes')
return arg

View File

@ -2,6 +2,8 @@ def process_config(config, args, flags):
args_override = {}
args_override['QUEUE_BACKEND'] = getattr(args, 'backend')
args_override['TX_DIGEST_SIZE'] = getattr(args, 'tx_digest_size')
args_override['QUEUE_STATE_PATH'] = getattr(args, 'state_dir')
config.dict_override(args_override, 'local cli args')

View File

@ -1,2 +1,8 @@
[queue]
backend = mem
state_path =
index_path =
counter_path =
[tx]
digest_size = 32

View File

@ -1,7 +1,7 @@
# standard imports
import logging
# ecxternal imports
# external imports
from hexathon import (
add_0x,
strip_0x,
@ -44,6 +44,12 @@ class QueueEntry:
return tx_hash
def local_state(self):
state = self.store.state(self.k)
state_str = self.store.name(state)
return (state, state_str,)
def load(self):
(self.k, self.signed_tx) = self.store.get(self.tx_hash)
self.synced = True
@ -134,8 +140,12 @@ class QueueEntry:
self.store.cache.set_block(self.tx_hash, block, tx)
def test(self, state):
return self.__match_state(state)
def __str__(self):
v = self.store.get(self.tx_hash)
n = self.store.state(v[0])
s = self.store.name(n)
return '{}: {}'.format(self.tx_hash, s)
return '{}: {} ({})'.format(self.k, s, n)

View File

@ -24,12 +24,6 @@ class CacheIntegrityError(ChainQueueException):
pass
class BackendIntegrityError(ChainQueueException):
"""Raised when queue backend has invalid state
"""
pass
class DuplicateTxError(ChainQueueException):
"""Backend already knows transaction
"""

View File

@ -4,15 +4,19 @@
import os
import logging
import sys
import importlib
# external imports
from hexathon import add_0x
import chainlib.cli
from chainlib.chain import ChainSpec
from crypto_dev_signer.eth.signer import ReferenceSigner as EIP155Signer
# local imports
from chainqueue.cli.output import Outputter
import chainqueue.cli
#from chainqueue.cli.output import Outputter
from chainqueue.settings import ChainqueueSettings
from chainqueue.store import Store
from chainqueue.entry import QueueEntry
logging.basicConfig(level=logging.WARNING)
@ -21,30 +25,31 @@ logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
config_dir = os.path.join(script_dir, '..', 'data', 'config')
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC
arg_flags = chainlib.cli.argflag_std_base | chainlib.cli.Flag.CHAIN_SPEC | chainlib.cli.Flag.UNSAFE
argparser = chainlib.cli.ArgumentParser(arg_flags)
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use (currently only "sql")')
argparser.add_argument('--start', type=str, help='Oldest transaction hash to include in results')
argparser.add_argument('--end', type=str, help='Newest transaction hash to include in results')
argparser.add_argument('--backend', type=str, default='sql', help='Backend to use')
argparser.add_argument('--error', action='store_true', help='Only show transactions which have error state')
argparser.add_argument('--pending', action='store_true', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=int, dest='status_mask', help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--summary', action='store_true', help='output summary for each status category')
argparser.add_argument('-o', '--column', dest='column', action='append', type=str, help='add a column to display')
argparser.add_positional('address', type=str, help='Ethereum address of recipient')
argparser.add_argument('--no-final', action='store_true', dest='no_final', help='Omit finalized transactions')
argparser.add_argument('--status-mask', type=str, dest='status_mask', action='append', default=[], help='Manually specify status bitmask value to match (overrides --error and --pending)')
argparser.add_argument('--exact', action='store_true', help='Match status exact')
argparser.add_argument('--include-pending', action='store_true', dest='include_pending', help='Include transactions in unprocessed state (pending)')
argparser.add_argument('--renderer', type=str, default=[], action='append', help='Transaction renderer for output')
argparser.add_positional('address', required=False, type=str, help='Ethereum address of recipient')
args = argparser.parse_args()
extra_args = {
'address': None,
'backend': None,
'start': None,
'end': None,
'state_dir': None,
'exact': None,
'error': None,
'pending': None,
'include_pending': '_PENDING',
'status_mask': None,
'column': None,
'summary': None,
'no_final': None,
'renderer': None,
}
config = chainlib.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
config = chainqueue.cli.config.process_config(config, args, 0)
logg.debug('config loaded:\n{}'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
@ -58,36 +63,40 @@ if status_mask == None:
tx_getter = None
tx_lister = None
session_method = None
if config.get('_BACKEND') == 'sql':
from chainqueue.sql.query import get_account_tx as tx_lister
from chainqueue.sql.query import get_tx_cache as tx_getter
from chainqueue.runnable.sql import setup_backend
from chainqueue.db.models.base import SessionBase
setup_backend(config, debug=config.true('DATABASE_DEBUG'))
session_method = SessionBase.create_session
else:
raise NotImplementedError('backend {} not implemented'.format(config.get('_BACKEND')))
#output_cols = config.get('_COLUMN')
output_cols = config.get('_COLUMN')
renderers_mods = []
for renderer in config.get('_RENDERER'):
m = importlib.import_module(renderer)
renderers_mods.append(m)
logg.info('using renderer module {}'.format(renderer))
settings = ChainqueueSettings()
settings.process(config)
logg.debug('settings:\n{}'.format(settings))
def main():
since = config.get('_START', None)
if since != None:
since = add_0x(since)
until = config.get('_END', None)
if until != None:
until = add_0x(until)
txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
outputter = Outputter(chain_spec, sys.stdout, tx_getter, session_method=session_method, decode_status=config.true('_RAW'), cols=output_cols)
if config.get('_SUMMARY'):
for k in txs.keys():
outputter.add(k)
outputter.decode_summary()
else:
for k in txs.keys():
outputter.decode_single(k)
# since = config.get('_START', None)
# if since != None:
# since = add_0x(since)
# until = config.get('_END', None)
# if until != None:
# until = add_0x(until)
# txs = tx_lister(chain_spec, config.get('_ADDRESS'), since=since, until=until, status=status_mask, not_status=not_status_mask)
txs = settings.get('QUEUE_STORE').by_state(state=settings.get('QUEUE_STATUS_FILTER'), strict=config.get('_EXACT'), include_pending=config.get('_PENDING'))
for i, tx_hash in enumerate(txs):
entry = QueueEntry(settings.get('QUEUE_STORE'), tx_hash)
entry.load()
v = None
if len(renderers_mods) == 0:
v = str(entry)
else:
for m in renderers_mods:
v = m.apply(i, settings, v, settings.get('CHAIN_SPEC'), entry)
print(v)
if __name__ == '__main__':
main()

View File

@ -1,14 +0,0 @@
# standard imports
import logging
# local imports
from chainqueue.db.models.base import SessionBase
from chainqueue.db import dsn_from_config
logg = logging.getLogger(__name__)
def setup_backend(config, debug=False):
dsn = dsn_from_config(config)
logg.debug('dsn {}'.format(dsn))
SessionBase.connect(dsn, debug=debug)

View File

@ -0,0 +1,51 @@
# SPDX-License-Identifier: GPL-3.0-or-later
# standard imports
import os
import logging
import argparse
# local imports
from chainqueue.state import Status
argparser = argparse.ArgumentParser()
argparser.add_argument('-r', '--raw', dest='r', action='store_true', help='Always print pure state element strings')
argparser.add_argument('state', type=str, help='State to interpret')
args = argparser.parse_args()
status_interpreter = Status(None, allow_invalid=True)
def handle_numeric(v, elements=False):
if elements:
if not status_interpreter.is_pure(v):
return status_interpreter.elements(v)
return status_interpreter.name(v)
def handle_string(v):
try:
return status_interpreter.from_name(v)
except AttributeError:
return status_interpreter.from_elements(v)
def main():
v = None
numeric = False
try:
v = int(args.state)
numeric = True
except:
v = args.state
r = None
if numeric:
r = handle_numeric(v, elements=args.r)
else:
r = handle_string(v)
print(r)
if __name__ == '__main__':
main()

View File

@ -1,8 +1,99 @@
# standard imports
import os
import logging
# external imports
from chainlib.settings import ChainSettings
from chainqueue.state import Status
from chainqueue.store import Store
logg = logging.getLogger(__name__)
class ChainqueueSettings(ChainSettings):
def process_queue_tx(settings, config):
settings.set('TX_DIGEST_SIZE', config.get('TX_DIGEST_SIZE'))
return settings
def process_queue_backend(self, config):
self.o['QUEUE_BACKEND'] = config.get('QUEUE_BACKEND')
def process_queue_store(settings, config):
status = Status(settings.get('QUEUE_STORE_FACTORY'), allow_invalid=True)
settings.set('QUEUE_STATE_STORE', status)
store = Store(
settings.get('CHAIN_SPEC'),
settings.get('QUEUE_STATE_STORE'),
settings.get('QUEUE_INDEX_STORE'),
settings.get('QUEUE_COUNTER_STORE'),
sync=True,
)
settings.set('QUEUE_STORE', store)
return settings
def process_queue_paths(settings, config):
index_dir = config.get('QUEUE_INDEX_PATH')
if index_dir == None:
index_dir = os.path.join(config.get('STATE_PATH'), 'tx')
counter_dir = config.get('QUEUE_COUNTER_PATH')
if counter_dir == None:
counter_dir = os.path.join(config.get('STATE_PATH'))
settings.set('QUEUE_STATE_PATH', config.get('STATE_PATH'))
settings.set('QUEUE_INDEX_PATH', index_dir)
settings.set('QUEUE_COUNTER_PATH', counter_dir)
return settings
def process_queue_backend_fs(settings, config):
from chainqueue.store.fs import IndexStore
from chainqueue.store.fs import CounterStore
from shep.store.file import SimpleFileStoreFactory
index_store = IndexStore(settings.o['QUEUE_INDEX_PATH'], digest_bytes=int(settings.o['TX_DIGEST_SIZE']))
counter_store = CounterStore(settings.o['QUEUE_COUNTER_PATH'])
factory = SimpleFileStoreFactory(settings.o['QUEUE_STATE_PATH'], use_lock=True).add
settings.set('QUEUE_INDEX_STORE', index_store)
settings.set('QUEUE_COUNTER_STORE', counter_store)
settings.set('QUEUE_STORE_FACTORY', factory)
return settings
def process_queue_status_filter(settings, config):
states = 0
store = settings.get('QUEUE_STATE_STORE')
if len(config.get('_STATUS_MASK')) == 0:
for v in store.all(numeric=True):
states |= v
logg.debug('state store {}'.format(states))
else:
for v in config.get('_STATUS_MASK'):
try:
states |= int(v)
continue
except ValueError:
pass
state = store.from_name(v)
logg.debug('resolved state argument {} to numeric state {}'.format(v, state))
states |= state
settings.set('QUEUE_STATUS_FILTER', states)
return settings
def process_queue(settings, config):
settings = process_queue_tx(settings, config)
settings = process_queue_paths(settings, config)
if config.get('QUEUE_BACKEND') == 'fs':
settings = process_queue_backend_fs(settings, config)
settings = process_queue_backend(settings, config)
settings = process_queue_store(settings, config)
settings = process_queue_status_filter(settings, config)
return settings
def process_settings(settings, config):
super(ChainqueueSettings, settings).process(config)
settings = settings.process_queue(settings, config)
return settings

View File

@ -105,11 +105,13 @@ class Verify:
class Status(shep.persist.PersistedState):
bits = 12
def __init__(self, store_factory):
def __init__(self, store_factory, allow_invalid=False, event_callback=None):
verify = Verify().verify
self.set_default_state('PENDING')
super(Status, self).__init__(store_factory, 12, verifier=verify)
super(Status, self).__init__(store_factory, self.bits, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback)
self.add('QUEUED')
self.add('RESERVED')
self.add('IN_NETWORK')

View File

@ -2,12 +2,15 @@
import re
import datetime
import logging
import time
# local imports
from chainqueue.cache import CacheTx
from chainqueue.entry import QueueEntry
from chainqueue.error import (
NotLocalTxError,
from chainqueue.error import NotLocalTxError
from chainqueue.enum import (
StatusBits,
all_errors,
)
logg = logging.getLogger(__name__)
@ -21,11 +24,12 @@ def from_key(k):
(ts_str, seq_str, tx_hash) = k.split('_')
return (float(ts_str), int(seq_str), tx_hash, )
all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, chain_spec, state_store, index_store, counter, cache=None):
def __init__(self, chain_spec, state_store, index_store, counter, cache=None, sync=True):
self.chain_spec = chain_spec
self.cache = cache
self.state_store = state_store
@ -43,9 +47,21 @@ class Store:
'unset',
'name',
'modified',
'purge',
]:
setattr(self, v, getattr(self.state_store, v))
self.state_store.sync()
if not sync:
return
sync_err = None
try:
self.state_store.sync()
except Exception as e:
sync_err = e
if sync_err != None:
raise FileNotFoundError(sync_err)
def put(self, v, cache_adapter=CacheTx):
@ -63,30 +79,53 @@ class Store:
def get(self, k):
v = None
s = self.index_store.get(k)
err = None
try:
s = self.index_store.get(k)
except FileNotFoundError:
raise NotLocalTxError(k)
self.state_store.sync()
v = self.state_store.get(s)
v = self.state_store.get(s)
except FileNotFoundError as e:
err = e
if v == None:
raise NotLocalTxError('could not find tx {}: {}'.format(k, err))
return (s, v,)
def by_state(self, state=0, limit=4096, strict=False, threshold=None):
def by_state(self, state=0, not_state=0, include_pending=False, limit=4096, strict=False, threshold=None):
hashes = []
i = 0
refs_state = []
if state > 0:
if self.state_store.is_pure(state):
refs_state = self.state_store.list(state)
elif strict:
refs_state = self.state_store.list(state)
else:
for v in self.state_store.elements(state, numeric=True):
refs_state += self.state_store.list(v)
refs_state = list(set(refs_state))
if include_pending:
refs_state += self.state_store.list(0)
refs_state.sort()
refs_state = self.state_store.list(state)
for ref in refs_state:
v = from_key(ref)
hsh = v[2]
item_state = self.state_store.state(ref)
if strict:
item_state = self.state_store.state(ref)
if item_state & state != item_state:
continue
if item_state & not_state > 0:
continue
item_state_str = self.state_store.name(item_state)
logg.info('state {} {} ({})'.format(ref, item_state_str, item_state))
if threshold != None:
v = self.state_store.modified(ref)
if v > threshold:
@ -94,9 +133,11 @@ class Store:
hashes.append(hsh)
i += 1
if limit > 0 and i == limit:
break
hashes.sort()
#hashes.sort()
return hashes
@ -108,8 +149,19 @@ class Store:
return self.by_state(state=self.DEFERRED, limit=limit, threshold=threshold)
def failed(self, limit=4096):
#return self.by_state(state=all_local_errors, limit=limit)
r = []
r += self.by_state(state=self.LOCAL_ERROR, limit=limit)
r += self.by_state(state=self.NODE_ERROR, limit=limit)
r.sort()
if len(r) > limit:
r = r[:limit]
return r
def pending(self, limit=4096):
return self.by_state(state=0, limit=limit, strict=True)
return self.by_state(include_pending=True, limit=limit, strict=True)
def reserve(self, k):
@ -130,6 +182,7 @@ class Store:
def fail(self, k):
entry = QueueEntry(self, k)
entry.load()
logg.debug('fail {}'.format(k))
entry.sendfail()
@ -153,3 +206,13 @@ class Store:
entry = QueueEntry(self, k)
entry.load()
entry.sent()
def is_reserved(self, k):
entry = QueueEntry(self, k)
entry.load()
return entry.test(self.RESERVED)
def sync(self):
self.state_store.sync()

View File

@ -6,7 +6,10 @@ import logging
from leveldir.hex import HexDir
# local imports
from chainqueue.error import DuplicateTxError
from chainqueue.error import (
DuplicateTxError,
NotLocalTxError,
)
logg = logging.getLogger(__name__)
@ -22,7 +25,7 @@ class IndexStore(HexDir):
existing = None
try:
existing = self.get(k)
except FileNotFoundError:
except NotLocalTxError:
pass
return existing != None
@ -37,7 +40,14 @@ class IndexStore(HexDir):
def get(self, k):
fp = self.store.to_filepath(k)
f = open(fp, 'rb')
f = None
err = None
try:
f = open(fp, 'rb')
except FileNotFoundError as e:
err = e
if err != None:
raise NotLocalTxError(err)
v = f.read()
f.close()
return v.decode('utf-8')
@ -64,7 +74,7 @@ class CounterStore:
v = f.read(8)
self.count = int.from_bytes(v, byteorder='big')
logg.info('counter starts at {}'.format(self.count))
logg.debug('counter starts at {}'.format(self.count))
f.seek(0)

View File

@ -1,9 +1,5 @@
#pysha3==1.0.2
hexathon~=0.1.5
hexathon~=0.1.7
leveldir~=0.3.0
#alembic==1.4.2
#SQLAlchemy==1.3.20
confini~=0.6.0
#pyxdg~=0.27
chainlib~=0.1.1
shep~=0.2.3
confini~=0.6.1
chainlib~=0.3.0
shep~=0.2.9

View File

@ -1,6 +1,6 @@
[metadata]
name = chainqueue
version = 0.1.3
version = 0.2.0
description = Generic blockchain transaction queue control
author = Louis Holbrook
author_email = dev@holbrook.no
@ -34,7 +34,9 @@ packages =
chainqueue.store
chainqueue.runnable
chainqueue.cli
chainqueue.data
#[options.entry_points]
#console_scripts =
# chainqueue-list = chainqueue.runnable.list:main
[options.entry_points]
console_scripts =
chainqueue-list = chainqueue.runnable.list:main
chainqueue-state = chainqueue.runnable.state:main

View File

@ -32,7 +32,7 @@ class TestEntry(TestShepBase):
entry = QueueEntry(self.store, cache_adapter=MockCacheTokenTx)
tx_hash_two = entry.create(signed_tx)
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 2)
logg.debug('tx hash one {}'.format(tx_hash_one))
@ -40,14 +40,14 @@ class TestEntry(TestShepBase):
entry.load()
entry.sent()
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
txs = self.store.by_state(state=self.store.IN_NETWORK)
self.assertEqual(len(txs), 1)
entry.succeed(None, None)
txs = self.store.by_state()
txs = self.store.by_state(include_pending=True)
self.assertEqual(len(txs), 1)
entry = QueueEntry(self.store, tx_hash_two)
@ -78,7 +78,7 @@ class TestEntry(TestShepBase):
entry = QueueEntry(self.store, tx_hash, cache_adapter=MockCacheTokenTx)
entry.load()
self.assertEqual(str(entry), tx_hash + ': SENDFAIL')
self.assertEqual(str(entry.tx_hash), tx_hash)
if __name__ == '__main__':

View File

@ -6,14 +6,23 @@ import logging
import shutil
# external imports
from chainlib.chain import ChainSpec
from shep.store.noop import NoopStoreFactory
# local imports
from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from chainqueue.store.base import Store
from chainqueue.error import DuplicateTxError
from chainqueue.state import Status
# tests imports
from tests.common import (
MockTokenCache,
MockCacheTokenTx,
)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
@ -58,5 +67,38 @@ class TestStoreImplementations(unittest.TestCase):
store.put(hx, data)
def test_upcoming_limit(self):
index_store = IndexStore(self.path)
counter_store = CounterStore(self.path)
chain_spec = ChainSpec('foo', 'bar', 42, 'baz')
factory = NoopStoreFactory().add
state_store = Status(factory)
cache_store = MockTokenCache()
queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store)
txs = []
for i in range(3):
tx_src = os.urandom(128).hex()
tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx)
txs.append(tx)
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 0)
for tx in txs:
queue_store.enqueue(tx[1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 3)
queue_store.send_start(txs[0][1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 2)
queue_store.send_end(txs[0][1])
r = queue_store.upcoming(limit=3)
self.assertEqual(len(r), 2)
if __name__ == '__main__':
unittest.main()