203 lines
4.8 KiB
Python
203 lines
4.8 KiB
Python
# standard imports
|
||
import re
|
||
import datetime
|
||
import logging
|
||
import time
|
||
|
||
# local imports
|
||
from chainqueue.cache import CacheTx
|
||
from chainqueue.entry import QueueEntry
|
||
from chainqueue.error import NotLocalTxError
|
||
from chainqueue.enum import (
|
||
StatusBits,
|
||
all_errors,
|
||
)
|
||
|
||
logg = logging.getLogger(__name__)
|
||
|
||
|
||
def to_key(t, n, k):
|
||
return '{}_{}_{}'.format(t, n, k)
|
||
|
||
|
||
def from_key(k):
|
||
(ts_str, seq_str, tx_hash) = k.split('_')
|
||
return (float(ts_str), int(seq_str), tx_hash, )
|
||
|
||
all_local_errors = all_errors() - StatusBits.NETWORK_ERROR
|
||
|
||
re_u = r'^[^_][_A-Z]+$'
|
||
class Store:
|
||
|
||
def __init__(self, chain_spec, state_store, index_store, counter, cache=None):
|
||
self.chain_spec = chain_spec
|
||
self.cache = cache
|
||
self.state_store = state_store
|
||
self.index_store = index_store
|
||
self.counter = counter
|
||
for s in dir(self.state_store):
|
||
if not re.match(re_u, s):
|
||
continue
|
||
v = self.state_store.from_name(s)
|
||
setattr(self, s, v)
|
||
for v in [
|
||
'state',
|
||
'change',
|
||
'set',
|
||
'unset',
|
||
'name',
|
||
'modified',
|
||
'purge',
|
||
]:
|
||
setattr(self, v, getattr(self.state_store, v))
|
||
|
||
sync_err = None
|
||
try:
|
||
self.state_store.sync()
|
||
except Exception as e:
|
||
sync_err = e
|
||
|
||
if sync_err != None:
|
||
raise FileNotFoundError(sync_err)
|
||
|
||
|
||
def put(self, v, cache_adapter=CacheTx):
|
||
tx = cache_adapter(self.chain_spec)
|
||
tx.deserialize(v)
|
||
k = tx.hash
|
||
n = self.counter.next()
|
||
t = datetime.datetime.now().timestamp()
|
||
s = to_key(t, n, k)
|
||
self.index_store.put(k, s)
|
||
self.state_store.put(s, v)
|
||
if self.cache != None:
|
||
self.cache.put(self.chain_spec, tx)
|
||
return (s, k,)
|
||
|
||
|
||
def get(self, k):
|
||
v = None
|
||
s = self.index_store.get(k)
|
||
err = None
|
||
try:
|
||
self.state_store.sync()
|
||
v = self.state_store.get(s)
|
||
except FileNotFoundError as e:
|
||
err = e
|
||
if v == None:
|
||
raise NotLocalTxError('could not find tx {}: {}'.format(k, err))
|
||
return (s, v,)
|
||
|
||
|
||
def by_state(self, state=0, not_state=0, limit=4096, strict=False, threshold=None):
|
||
hashes = []
|
||
i = 0
|
||
|
||
refs_state = self.state_store.list(state)
|
||
refs_state.sort()
|
||
|
||
for ref in refs_state:
|
||
v = from_key(ref)
|
||
hsh = v[2]
|
||
|
||
item_state = self.state_store.state(ref)
|
||
|
||
if strict:
|
||
if item_state & state != item_state:
|
||
continue
|
||
|
||
logg.info('state {} {}'.format(ref, item_state))
|
||
if item_state & not_state > 0:
|
||
continue
|
||
|
||
if threshold != None:
|
||
v = self.state_store.modified(ref)
|
||
if v > threshold:
|
||
continue
|
||
|
||
hashes.append(hsh)
|
||
|
||
i += 1
|
||
if limit > 0 and i == limit:
|
||
break
|
||
|
||
hashes.sort()
|
||
return hashes
|
||
|
||
|
||
def upcoming(self, limit=4096):
|
||
return self.by_state(state=self.QUEUED, limit=limit)
|
||
|
||
|
||
def deferred(self, limit=4096, threshold=None):
|
||
return self.by_state(state=self.DEFERRED, limit=limit, threshold=threshold)
|
||
|
||
|
||
def failed(self, limit=4096):
|
||
#return self.by_state(state=all_local_errors, limit=limit)
|
||
r = []
|
||
r += self.by_state(state=self.LOCAL_ERROR, limit=limit)
|
||
r += self.by_state(state=self.NODE_ERROR, limit=limit)
|
||
r.sort()
|
||
if len(r) > limit:
|
||
r = r[:limit]
|
||
return r
|
||
|
||
|
||
def pending(self, limit=4096):
|
||
return self.by_state(state=0, limit=limit, strict=True)
|
||
|
||
|
||
def reserve(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
entry.reserve()
|
||
|
||
|
||
def enqueue(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
try:
|
||
entry.retry()
|
||
except StateTransitionInvalid:
|
||
entry.readysend()
|
||
|
||
|
||
def fail(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
logg.debug('fail {}'.format(k))
|
||
entry.sendfail()
|
||
|
||
|
||
def final(self, k, block, tx, error=False):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
if error:
|
||
entry.fail(block, tx)
|
||
else:
|
||
entry.succeed(block, tx)
|
||
|
||
|
||
def send_start(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
entry.reserve()
|
||
return entry
|
||
|
||
|
||
def send_end(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
entry.sent()
|
||
|
||
|
||
def is_reserved(self, k):
|
||
entry = QueueEntry(self, k)
|
||
entry.load()
|
||
return entry.test(self.RESERVED)
|
||
|
||
|
||
def sync(self):
|
||
self.state_store.sync()
|