From c94b291d394432746279b40e2822be2c3d9d39e2 Mon Sep 17 00:00:00 2001 From: lash Date: Sat, 30 Apr 2022 05:42:32 +0000 Subject: [PATCH] Add upcoming tests, event callback pass to shep --- chainqueue/state.py | 4 ++-- chainqueue/store/base.py | 21 +++++++++++++++++++- setup.cfg | 2 +- tests/test_store.py | 42 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/chainqueue/state.py b/chainqueue/state.py index 87882d5..d82805f 100644 --- a/chainqueue/state.py +++ b/chainqueue/state.py @@ -106,10 +106,10 @@ class Verify: class Status(shep.persist.PersistedState): - def __init__(self, store_factory): + def __init__(self, store_factory, allow_invalid=False, event_callback=None): verify = Verify().verify self.set_default_state('PENDING') - super(Status, self).__init__(store_factory, 12, verifier=verify) + super(Status, self).__init__(store_factory, 12, verifier=verify, check_alias=not allow_invalid, event_callback=event_callback) self.add('QUEUED') self.add('RESERVED') self.add('IN_NETWORK') diff --git a/chainqueue/store/base.py b/chainqueue/store/base.py index 494cc8d..cf0fc64 100644 --- a/chainqueue/store/base.py +++ b/chainqueue/store/base.py @@ -9,6 +9,10 @@ from chainqueue.entry import QueueEntry from chainqueue.error import ( NotLocalTxError, ) +from chainqueue.enum import ( + StatusBits, + all_errors, + ) logg = logging.getLogger(__name__) @@ -21,6 +25,7 @@ def from_key(k): (ts_str, seq_str, tx_hash) = k.split('_') return (float(ts_str), int(seq_str), tx_hash, ) +all_local_errors = all_errors() - StatusBits.NETWORK_ERROR re_u = r'^[^_][_A-Z]+$' class Store: @@ -94,7 +99,9 @@ class Store: hashes.append(hsh) - + i += 1 + if limit > 0 and i == limit: + break hashes.sort() return hashes @@ -108,6 +115,17 @@ class Store: return self.by_state(state=self.DEFERRED, limit=limit, threshold=threshold) + def failed(self, limit=4096): + #return self.by_state(state=all_local_errors, limit=limit) + r = [] + r += self.by_state(state=self.LOCAL_ERROR, limit=limit) + r += self.by_state(state=self.NODE_ERROR, limit=limit) + r.sort() + if len(r) > limit: + r = r[:limit] + return r + + def pending(self, limit=4096): return self.by_state(state=0, limit=limit, strict=True) @@ -130,6 +148,7 @@ class Store: def fail(self, k): entry = QueueEntry(self, k) entry.load() + logg.debug('fail {}'.format(k)) entry.sendfail() diff --git a/setup.cfg b/setup.cfg index 43a41bb..b146450 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chainqueue -version = 0.1.5 +version = 0.1.6 description = Generic blockchain transaction queue control author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_store.py b/tests/test_store.py index 39c6db2..00b5b99 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -6,14 +6,23 @@ import logging import shutil # external imports +from chainlib.chain import ChainSpec +from shep.store.noop import NoopStoreFactory # local imports from chainqueue.store.fs import ( IndexStore, CounterStore, ) +from chainqueue.store.base import Store from chainqueue.error import DuplicateTxError +from chainqueue.state import Status +# tests imports +from tests.common import ( + MockTokenCache, + MockCacheTokenTx, + ) logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() @@ -58,5 +67,38 @@ class TestStoreImplementations(unittest.TestCase): store.put(hx, data) + def test_upcoming_limit(self): + index_store = IndexStore(self.path) + counter_store = CounterStore(self.path) + chain_spec = ChainSpec('foo', 'bar', 42, 'baz') + factory = NoopStoreFactory().add + state_store = Status(factory) + cache_store = MockTokenCache() + queue_store = Store(chain_spec, state_store, index_store, counter_store, cache=cache_store) + + txs = [] + for i in range(3): + tx_src = os.urandom(128).hex() + tx = queue_store.put(tx_src, cache_adapter=MockCacheTokenTx) + txs.append(tx) + + r = queue_store.upcoming(limit=3) + self.assertEqual(len(r), 0) + + for tx in txs: + queue_store.enqueue(tx[1]) + + r = queue_store.upcoming(limit=3) + self.assertEqual(len(r), 3) + + queue_store.send_start(txs[0][1]) + r = queue_store.upcoming(limit=3) + self.assertEqual(len(r), 2) + + queue_store.send_end(txs[0][1]) + r = queue_store.upcoming(limit=3) + self.assertEqual(len(r), 2) + + if __name__ == '__main__': unittest.main()