From a6e48d93a8a9912a0b573286bdd1671606143603 Mon Sep 17 00:00:00 2001 From: lash Date: Sun, 13 Mar 2022 15:40:45 +0000 Subject: [PATCH] Implement upcoming query on store --- chainqueue/cache/__init__.py | 1 + chainqueue/{cache.py => cache/base.py} | 0 chainqueue/cache/fs.py | 10 ++++++ chainqueue/entry.py | 7 +++++ chainqueue/store/base.py | 42 ++++++++++++++++++-------- requirements.txt | 2 +- tests/test_integrate.py | 10 ++++++ 7 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 chainqueue/cache/__init__.py rename chainqueue/{cache.py => cache/base.py} (100%) create mode 100644 chainqueue/cache/fs.py diff --git a/chainqueue/cache/__init__.py b/chainqueue/cache/__init__.py new file mode 100644 index 0000000..9b5ed21 --- /dev/null +++ b/chainqueue/cache/__init__.py @@ -0,0 +1 @@ +from .base import * diff --git a/chainqueue/cache.py b/chainqueue/cache/base.py similarity index 100% rename from chainqueue/cache.py rename to chainqueue/cache/base.py diff --git a/chainqueue/cache/fs.py b/chainqueue/cache/fs.py new file mode 100644 index 0000000..a0f822f --- /dev/null +++ b/chainqueue/cache/fs.py @@ -0,0 +1,10 @@ +# local imports +from .base import Cache + + +class FsCache(Cache): + + def __init__(self, path): + self.path = path + + diff --git a/chainqueue/entry.py b/chainqueue/entry.py index 0df3ac0..1b0059c 100644 --- a/chainqueue/entry.py +++ b/chainqueue/entry.py @@ -118,3 +118,10 @@ class QueueEntry: def succeed(self, block): self.store.set(self.k, self.store.FINAL) + + + def __str__(self): + v = self.store.get(self.tx_hash) + n = self.store.state(v[0]) + s = self.store.name(n) + return '{}: {}'.format(self.tx_hash, s) diff --git a/chainqueue/store/base.py b/chainqueue/store/base.py index bcefbde..93ffdec 100644 --- a/chainqueue/store/base.py +++ b/chainqueue/store/base.py @@ -5,6 +5,7 @@ import datetime # local imports from chainqueue.cache import CacheTx +from chainqueue.entry import QueueEntry logg = logging.getLogger(__name__) @@ -33,7 +34,7 @@ class Store: continue v = self.state_store.from_name(s) setattr(self, s, v) - for v in ['state', 'change', 'set', 'unset']: + for v in ['state', 'change', 'set', 'unset', 'name']: setattr(self, v, getattr(self.state_store, v)) @@ -59,23 +60,38 @@ class Store: hashes = [] i = 0 - hashes_state = self.state_store.list(state) - if strict: - for k in hashes_state: - item_state = self.state_store.state(k) + refs_state = self.state_store.list(state) + + for ref in refs_state: + v = from_key(ref) + hsh = v[2] + + if strict: + item_state = self.state_store.state(ref) if item_state & state != item_state: continue - hashes.append(k) - else: - hashes = hashes_state + hashes.append(hsh) hashes.sort() - hashes_out = [] - for h in hashes: - pair = from_key(h) - hashes_out.append(pair[1]) - return hashes_out + return hashes def upcoming(self, limit=4096): return self.by_state(state=self.QUEUED, limit=limit) + + + def deferred(self, limit=4096): + return self.by_state(state=self.DEFERRED, limit=limit) + + + def pending(self, limit=4096): + return self.by_state(state=0, limit=limit, strict=True) + + + def enqueue(self, k): + entry = QueueEntry(self, k) + entry.load() + try: + entry.retry() + except StateTransitionInvalid: + entry.readysend() diff --git a/requirements.txt b/requirements.txt index 187788c..c87b094 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,4 @@ SQLAlchemy==1.3.20 confini~=0.6.0 pyxdg~=0.27 chainlib>=0.1.0b1,<=0.1.0 -shep~=0.1.1 +shep>=0.1.1rc1,<=0.2.0 diff --git a/tests/test_integrate.py b/tests/test_integrate.py index 33da5b5..ccb8101 100644 --- a/tests/test_integrate.py +++ b/tests/test_integrate.py @@ -43,5 +43,15 @@ class TestIntegrateBase(TestShepBase): self.store.put(b'foo'.hex(), b'bar'.hex(), cache_adapter=MockCacheTokenTx) + def test_state_move(self): + hx = b'foo'.hex() + self.store.put(hx, b'bar'.hex(), cache_adapter=MockCacheTokenTx) + self.store.get(hx) + self.store.enqueue(hx) + v = self.store.upcoming() + self.assertEqual(len(v), 1) + self.assertEqual(v[0], hx) + + if __name__ == '__main__': unittest.main()