implement get tx by state

This commit is contained in:
lash
2022-03-11 19:38:12 +00:00
parent 539d3384a6
commit c4caab6a3a
9 changed files with 279 additions and 161 deletions

View File

@@ -1 +1,3 @@
from .state import Status
from .entry import QueueEntry
from .store import Store

8
chainqueue/cache.py Normal file
View File

@@ -0,0 +1,8 @@
class Cache:
def put(self, chain_spec, tx):
raise NotImplementedError()
def get(self, chain_spec, tx_hash):
raise NotImplementedError()

116
chainqueue/entry.py Normal file
View File

@@ -0,0 +1,116 @@
# standard imports
import logging
logg = logging.getLogger(__name__)
class QueueEntry:
def __init__(self, store, tx_hash):
self.store = store
self.tx_hash = tx_hash
self.signed_tx = None
self.seq = None
self.k = None
self.synced = False
def __to_key(self, k, v):
return '{:>010s}_{}'.format(k, v)
def create(self, seq, signed_tx):
n = str(seq)
self.k = self.__to_key(n, self.tx_hash)
self.store.put(self.k, signed_tx)
self.store.put_seq(self.tx_hash, n)
self.synced = True
def load(self):
seq = self.store.get_seq(self.tx_hash)
self.k = self.__to_key(seq, self.tx_hash)
self.signed_tx = self.store.get(self.k)
self.synced = True
def __match_state(self, state):
return bool(self.store.state(self.k) & state)
def waitforfunds(self):
if self.__match_state(self.store.INSUFFICIENT_FUNDS):
return
self.store.move(self.k, self.store.INSUFFICIENT_FUNDS)
def fubar(self):
if self.__match_state(self.store.UNKNOWN_ERROR):
return
self.store.set(self.k, self.store.UNKNOWN_ERROR)
def reject(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.store.set(self.k, self.store.NODE_ERROR)
def override(self, manual=False):
if manual:
self.store.set(self.k, self.store.OBSOLETE | self.store.MANUAL)
else:
self.store.set(self.k, self.store.OBSOLETE)
def manual(self):
self.store.set(self.k, self.store.MANUAL)
def retry(self):
if self.__match_state(self.store.QUEUED):
return
self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def readysend(self):
if self.__match_state(self.store.QUEUED):
return
self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def sent(self):
if self.__match_state(self.store.IN_NETWORK):
return
self.store.change(self.k, self.store.IN_NETWORK, self.store.RESERVED | self.store.DEFERRED | self.store.QUEUED | self.store.LOCAL_ERROR | self.store.NODE_ERROR)
def sendfail(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.store.change(self.k, self.store.LOCAL_ERROR | self.store.DEFERRED, self.store.RESERVED | self.store.QUEUED | self.store.INSUFFICIENT_FUNDS)
def reserve(self):
if self.__match_state(self.store.RESERVED):
return
self.store.change(self.k, self.store.RESERVED, self.store.QUEUED)
def fail(self, block):
if self.__match_state(self.store.NETWORK_ERROR):
return
self.store.set(self.k, self.store.NETWORK_ERROR)
if self.cache:
self.cache.set_block(self.tx_hash, block)
def cancel(self, confirmed=False):
if confirmed:
self.store.change(self.k, self.store.OBSOLETE | self.store.FINAL, self.store.RESERVED | self.store.QUEUED)
else:
self.store.change(self.k, self.store.OBSOLETE, self.store.RESERVED | self.store.QUEUED)
def succeed(self, block):
self.store.set(self.k, self.store.FINAL)

View File

@@ -15,7 +15,6 @@ class Verify:
try:
m = getattr(self, to_state_name)
except AttributeError:
logg.debug('foo {}'.format(to_state_name))
return None
r = m(state_store, from_state)

47
chainqueue/store.py Normal file
View File

@@ -0,0 +1,47 @@
# standard imports
import logging
import re
logg = logging.getLogger(__name__)
re_u = r'^[^_][_A-Z]+$'
class Store:
def __init__(self, state_store, index_store):
self.state_store = state_store
self.index_store = index_store
for s in dir(self.state_store):
if not re.match(re_u, s):
continue
v = self.state_store.from_name(s)
setattr(self, s, v)
for v in ['put', 'get', 'state', 'change', 'set', 'unset']:
setattr(self, v, getattr(self.state_store, v))
def put(self, k, v):
self.state_store.put(k, v)
def get(self, k, v):
return self.state_store.get(k)
def put_seq(self, k, seq):
self.index_store.put(k, seq)
def get_seq(self, k):
return self.index_store.get(k)
def list(self, state=0, limit=4096, state_exact=False):
hashes = []
i = 0
for k in self.state_store.list(state):
if state_exact:
if self.state_store.state(k) & state == state:
continue
hashes.append(k)
return hashes

View File

@@ -1,131 +0,0 @@
# standard imports
import logging
logg = logging.getLogger(__name__)
class Tx:
def __init__(self, state_store, index_store, tx_hash, cache=None):
self.state_store = state_store
self.index_store = index_store
self.cache = cache
self.tx_hash = tx_hash
self.signed_tx = None
self.seq = None
self.k = None
self.synced = False
def __to_key(self, k, v):
return '{:>010s}_{}'.format(k, v)
def create(self, seq, signed_tx):
n = str(seq)
self.k = self.__to_key(n, self.tx_hash)
self.state_store.put(self.k, signed_tx)
self.index_store.put(self.tx_hash, n)
self.synced = True
def load(self):
seq = self.index_store.get(self.tx_hash)
self.k = self.__to_key(seq, self.tx_hash)
self.signed_tx = self.state_store.get(self.k)
self.synced = True
def __match_state(self, state):
return bool(self.store.state(self.k) & state)
def waitforfunds(self):
if self.__match_state(self.store.INSUFFICIENT_FUNDS):
return
self.state.move(self.k, self.store.INSUFFICIENT_FUNDS)
def fubar(self):
if self.__match_state(self.store.UNKNOWN_ERROR):
return
self.state.set(self.k, self.store.UNKNOWN_ERROR)
def reject(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.state.set(self.k, self.store.NODE_ERROR)
def override(self, manual=False):
if manual:
self.state.set(self.k, self.store.OBSOLETE | self.store.MANUAL)
else:
self.state.set(self.k, self.store.OBSOLETE)
def manual(self):
self.state.set(self.k, self.store.MANUAL)
def retry(self):
if self.__match_state(self.store.QUEUED):
return
self.state.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def readysend(self):
if self.__match_state(self.store.QUEUED):
return
self.state.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS)
def sent(self):
if self.__match_state(self.store.IN_NETWORK):
return
self.state.change(self.k, self.state.IN_NETWORK, self.state.RESERVED | self.state.DEFERRED | self.state.QUEUED | self.state.LOCAL_ERROR | self.state.NODE_ERROR)
def sendfail(self):
if self.__match_state(self.store.NODE_ERROR):
return
self.state.change(self.k, self.state.LOCAL_ERROR | self.state.DEFERRED, self.state.RESERVED | self.state.QUEUED | self.state.INSUFFICIENT_FUNDS)
def reserve(self):
if self.__match_state(self.store.RESERVED):
return
self.state.change(self.k, self.state.RESERVED, self.state.QUEUED)
def minefail(self, block):
if self.__match_state(self.store.NETWORK_ERROR):
return
self.state.set(self.k, self.state.NETWORK_ERROR)
if self.cache:
self.cache.set_block(self.tx_hash, block)
def cancel(self, confirmed=False):
if confirmed:
self.state.change(self.k, self.state.OBSOLETE | self.state.FINAL, self.state.RESERVED | self.state.QUEUED)
else:
self.state.change(self.k, self.state.OBSOLETE, self.state.RESERVED | self.state.QUEUED)
def success(self, block):
self.state.set(self.state.FINAL)
if self.cache:
self.cache.set_block(self.tx_hash, block)
def get(status=0, limit=4096, status_exact=True):
hashes = []
i = 0
for k in self.state.list(status):
if status_exact:
if self.state.status(k) != status:
continue
hashes.append(k)
return k