Implement upcoming query on store

This commit is contained in:
lash 2022-03-13 15:40:45 +00:00
parent 51c8124a28
commit a6e48d93a8
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 58 additions and 14 deletions

1
chainqueue/cache/__init__.py vendored Normal file
View File

@ -0,0 +1 @@
from .base import *

10
chainqueue/cache/fs.py vendored Normal file
View File

@ -0,0 +1,10 @@
# local imports
from .base import Cache
class FsCache(Cache):
def __init__(self, path):
self.path = path

View File

@ -118,3 +118,10 @@ class QueueEntry:
def succeed(self, block): def succeed(self, block):
self.store.set(self.k, self.store.FINAL) self.store.set(self.k, self.store.FINAL)
def __str__(self):
v = self.store.get(self.tx_hash)
n = self.store.state(v[0])
s = self.store.name(n)
return '{}: {}'.format(self.tx_hash, s)

View File

@ -5,6 +5,7 @@ import datetime
# local imports # local imports
from chainqueue.cache import CacheTx from chainqueue.cache import CacheTx
from chainqueue.entry import QueueEntry
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -33,7 +34,7 @@ class Store:
continue continue
v = self.state_store.from_name(s) v = self.state_store.from_name(s)
setattr(self, s, v) setattr(self, s, v)
for v in ['state', 'change', 'set', 'unset']: for v in ['state', 'change', 'set', 'unset', 'name']:
setattr(self, v, getattr(self.state_store, v)) setattr(self, v, getattr(self.state_store, v))
@ -59,23 +60,38 @@ class Store:
hashes = [] hashes = []
i = 0 i = 0
hashes_state = self.state_store.list(state) refs_state = self.state_store.list(state)
for ref in refs_state:
v = from_key(ref)
hsh = v[2]
if strict: if strict:
for k in hashes_state: item_state = self.state_store.state(ref)
item_state = self.state_store.state(k)
if item_state & state != item_state: if item_state & state != item_state:
continue continue
hashes.append(k) hashes.append(hsh)
else:
hashes = hashes_state
hashes.sort() hashes.sort()
hashes_out = [] return hashes
for h in hashes:
pair = from_key(h)
hashes_out.append(pair[1])
return hashes_out
def upcoming(self, limit=4096): def upcoming(self, limit=4096):
return self.by_state(state=self.QUEUED, limit=limit) return self.by_state(state=self.QUEUED, limit=limit)
def deferred(self, limit=4096):
return self.by_state(state=self.DEFERRED, limit=limit)
def pending(self, limit=4096):
return self.by_state(state=0, limit=limit, strict=True)
def enqueue(self, k):
entry = QueueEntry(self, k)
entry.load()
try:
entry.retry()
except StateTransitionInvalid:
entry.readysend()

View File

@ -6,4 +6,4 @@ SQLAlchemy==1.3.20
confini~=0.6.0 confini~=0.6.0
pyxdg~=0.27 pyxdg~=0.27
chainlib>=0.1.0b1,<=0.1.0 chainlib>=0.1.0b1,<=0.1.0
shep~=0.1.1 shep>=0.1.1rc1,<=0.2.0

View File

@ -43,5 +43,15 @@ class TestIntegrateBase(TestShepBase):
self.store.put(b'foo'.hex(), b'bar'.hex(), cache_adapter=MockCacheTokenTx) self.store.put(b'foo'.hex(), b'bar'.hex(), cache_adapter=MockCacheTokenTx)
def test_state_move(self):
hx = b'foo'.hex()
self.store.put(hx, b'bar'.hex(), cache_adapter=MockCacheTokenTx)
self.store.get(hx)
self.store.enqueue(hx)
v = self.store.upcoming()
self.assertEqual(len(v), 1)
self.assertEqual(v[0], hx)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()