From c4caab6a3a7a7e289f7e5ea6afbda63d281bb776 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 11 Mar 2022 19:38:12 +0000 Subject: [PATCH] implement get tx by state --- chainqueue/__init__.py | 2 + chainqueue/cache.py | 8 +++ chainqueue/entry.py | 116 ++++++++++++++++++++++++++++++++++++ chainqueue/state.py | 1 - chainqueue/store.py | 47 +++++++++++++++ chainqueue/tx.py | 131 ----------------------------------------- tests/base_shep.py | 35 +++++++++++ tests/test_entry.py | 65 ++++++++++++++++++++ tests/test_shep.py | 35 ++--------- 9 files changed, 279 insertions(+), 161 deletions(-) create mode 100644 chainqueue/cache.py create mode 100644 chainqueue/entry.py create mode 100644 chainqueue/store.py delete mode 100644 chainqueue/tx.py create mode 100644 tests/base_shep.py create mode 100644 tests/test_entry.py diff --git a/chainqueue/__init__.py b/chainqueue/__init__.py index d59d82c..5faad25 100644 --- a/chainqueue/__init__.py +++ b/chainqueue/__init__.py @@ -1 +1,3 @@ from .state import Status +from .entry import QueueEntry +from .store import Store diff --git a/chainqueue/cache.py b/chainqueue/cache.py new file mode 100644 index 0000000..8913213 --- /dev/null +++ b/chainqueue/cache.py @@ -0,0 +1,8 @@ +class Cache: + + def put(self, chain_spec, tx): + raise NotImplementedError() + + + def get(self, chain_spec, tx_hash): + raise NotImplementedError() diff --git a/chainqueue/entry.py b/chainqueue/entry.py new file mode 100644 index 0000000..806cbac --- /dev/null +++ b/chainqueue/entry.py @@ -0,0 +1,116 @@ +# standard imports +import logging + +logg = logging.getLogger(__name__) + + +class QueueEntry: + + def __init__(self, store, tx_hash): + self.store = store + self.tx_hash = tx_hash + self.signed_tx = None + self.seq = None + self.k = None + self.synced = False + + + def __to_key(self, k, v): + return '{:>010s}_{}'.format(k, v) + + + def create(self, seq, signed_tx): + n = str(seq) + self.k = self.__to_key(n, self.tx_hash) + self.store.put(self.k, signed_tx) + self.store.put_seq(self.tx_hash, n) + self.synced = True + + + def load(self): + seq = self.store.get_seq(self.tx_hash) + self.k = self.__to_key(seq, self.tx_hash) + self.signed_tx = self.store.get(self.k) + self.synced = True + + + def __match_state(self, state): + return bool(self.store.state(self.k) & state) + + + def waitforfunds(self): + if self.__match_state(self.store.INSUFFICIENT_FUNDS): + return + self.store.move(self.k, self.store.INSUFFICIENT_FUNDS) + + + def fubar(self): + if self.__match_state(self.store.UNKNOWN_ERROR): + return + self.store.set(self.k, self.store.UNKNOWN_ERROR) + + + def reject(self): + if self.__match_state(self.store.NODE_ERROR): + return + self.store.set(self.k, self.store.NODE_ERROR) + + + def override(self, manual=False): + if manual: + self.store.set(self.k, self.store.OBSOLETE | self.store.MANUAL) + else: + self.store.set(self.k, self.store.OBSOLETE) + + + def manual(self): + self.store.set(self.k, self.store.MANUAL) + + + def retry(self): + if self.__match_state(self.store.QUEUED): + return + self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS) + + + def readysend(self): + if self.__match_state(self.store.QUEUED): + return + self.store.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS) + + + def sent(self): + if self.__match_state(self.store.IN_NETWORK): + return + self.store.change(self.k, self.store.IN_NETWORK, self.store.RESERVED | self.store.DEFERRED | self.store.QUEUED | self.store.LOCAL_ERROR | self.store.NODE_ERROR) + + + def sendfail(self): + if self.__match_state(self.store.NODE_ERROR): + return + self.store.change(self.k, self.store.LOCAL_ERROR | self.store.DEFERRED, self.store.RESERVED | self.store.QUEUED | self.store.INSUFFICIENT_FUNDS) + + + def reserve(self): + if self.__match_state(self.store.RESERVED): + return + self.store.change(self.k, self.store.RESERVED, self.store.QUEUED) + + + def fail(self, block): + if self.__match_state(self.store.NETWORK_ERROR): + return + self.store.set(self.k, self.store.NETWORK_ERROR) + if self.cache: + self.cache.set_block(self.tx_hash, block) + + + def cancel(self, confirmed=False): + if confirmed: + self.store.change(self.k, self.store.OBSOLETE | self.store.FINAL, self.store.RESERVED | self.store.QUEUED) + else: + self.store.change(self.k, self.store.OBSOLETE, self.store.RESERVED | self.store.QUEUED) + + + def succeed(self, block): + self.store.set(self.k, self.store.FINAL) diff --git a/chainqueue/state.py b/chainqueue/state.py index ebb2ae7..27d8bb8 100644 --- a/chainqueue/state.py +++ b/chainqueue/state.py @@ -15,7 +15,6 @@ class Verify: try: m = getattr(self, to_state_name) except AttributeError: - logg.debug('foo {}'.format(to_state_name)) return None r = m(state_store, from_state) diff --git a/chainqueue/store.py b/chainqueue/store.py new file mode 100644 index 0000000..5d87e70 --- /dev/null +++ b/chainqueue/store.py @@ -0,0 +1,47 @@ +# standard imports +import logging +import re + +logg = logging.getLogger(__name__) + + +re_u = r'^[^_][_A-Z]+$' +class Store: + + def __init__(self, state_store, index_store): + self.state_store = state_store + self.index_store = index_store + for s in dir(self.state_store): + if not re.match(re_u, s): + continue + v = self.state_store.from_name(s) + setattr(self, s, v) + for v in ['put', 'get', 'state', 'change', 'set', 'unset']: + setattr(self, v, getattr(self.state_store, v)) + + + def put(self, k, v): + self.state_store.put(k, v) + + + def get(self, k, v): + return self.state_store.get(k) + + + def put_seq(self, k, seq): + self.index_store.put(k, seq) + + + def get_seq(self, k): + return self.index_store.get(k) + + + def list(self, state=0, limit=4096, state_exact=False): + hashes = [] + i = 0 + for k in self.state_store.list(state): + if state_exact: + if self.state_store.state(k) & state == state: + continue + hashes.append(k) + return hashes diff --git a/chainqueue/tx.py b/chainqueue/tx.py deleted file mode 100644 index 4c51d36..0000000 --- a/chainqueue/tx.py +++ /dev/null @@ -1,131 +0,0 @@ -# standard imports -import logging - -logg = logging.getLogger(__name__) - - -class Tx: - - def __init__(self, state_store, index_store, tx_hash, cache=None): - self.state_store = state_store - self.index_store = index_store - self.cache = cache - self.tx_hash = tx_hash - self.signed_tx = None - self.seq = None - self.k = None - self.synced = False - - - def __to_key(self, k, v): - return '{:>010s}_{}'.format(k, v) - - - def create(self, seq, signed_tx): - n = str(seq) - self.k = self.__to_key(n, self.tx_hash) - self.state_store.put(self.k, signed_tx) - self.index_store.put(self.tx_hash, n) - self.synced = True - - - def load(self): - seq = self.index_store.get(self.tx_hash) - self.k = self.__to_key(seq, self.tx_hash) - self.signed_tx = self.state_store.get(self.k) - self.synced = True - - - def __match_state(self, state): - return bool(self.store.state(self.k) & state) - - - def waitforfunds(self): - if self.__match_state(self.store.INSUFFICIENT_FUNDS): - return - self.state.move(self.k, self.store.INSUFFICIENT_FUNDS) - - - def fubar(self): - if self.__match_state(self.store.UNKNOWN_ERROR): - return - self.state.set(self.k, self.store.UNKNOWN_ERROR) - - - def reject(self): - if self.__match_state(self.store.NODE_ERROR): - return - self.state.set(self.k, self.store.NODE_ERROR) - - - def override(self, manual=False): - if manual: - self.state.set(self.k, self.store.OBSOLETE | self.store.MANUAL) - else: - self.state.set(self.k, self.store.OBSOLETE) - - - def manual(self): - self.state.set(self.k, self.store.MANUAL) - - - def retry(self): - if self.__match_state(self.store.QUEUED): - return - self.state.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS) - - - def readysend(self): - if self.__match_state(self.store.QUEUED): - return - self.state.change(self.k, self.store.QUEUED, self.store.INSUFFICIENT_FUNDS) - - - def sent(self): - if self.__match_state(self.store.IN_NETWORK): - return - self.state.change(self.k, self.state.IN_NETWORK, self.state.RESERVED | self.state.DEFERRED | self.state.QUEUED | self.state.LOCAL_ERROR | self.state.NODE_ERROR) - - - def sendfail(self): - if self.__match_state(self.store.NODE_ERROR): - return - self.state.change(self.k, self.state.LOCAL_ERROR | self.state.DEFERRED, self.state.RESERVED | self.state.QUEUED | self.state.INSUFFICIENT_FUNDS) - - - def reserve(self): - if self.__match_state(self.store.RESERVED): - return - self.state.change(self.k, self.state.RESERVED, self.state.QUEUED) - - - def minefail(self, block): - if self.__match_state(self.store.NETWORK_ERROR): - return - self.state.set(self.k, self.state.NETWORK_ERROR) - if self.cache: - self.cache.set_block(self.tx_hash, block) - - - def cancel(self, confirmed=False): - if confirmed: - self.state.change(self.k, self.state.OBSOLETE | self.state.FINAL, self.state.RESERVED | self.state.QUEUED) - else: - self.state.change(self.k, self.state.OBSOLETE, self.state.RESERVED | self.state.QUEUED) - - - def success(self, block): - self.state.set(self.state.FINAL) - if self.cache: - self.cache.set_block(self.tx_hash, block) - - - def get(status=0, limit=4096, status_exact=True): - hashes = [] - i = 0 - for k in self.state.list(status): - if status_exact: - if self.state.status(k) != status: - continue - hashes.append(k) - return k diff --git a/tests/base_shep.py b/tests/base_shep.py new file mode 100644 index 0000000..74c0c77 --- /dev/null +++ b/tests/base_shep.py @@ -0,0 +1,35 @@ +# standard imports +import tempfile +import unittest + +# external imports +from shep.store.file import SimpleFileStoreFactory + +# local imports +from chainqueue import ( + Store, + Status, + ) + + +class MockIndexStore: + + def __init__(self): + self.store = {} + + + def put(self, k, v): + self.store[k] = v + + + def get(self, k): + return self.store.get(k) + + +class TestShepBase(unittest.TestCase): + + def setUp(self): + self.path = tempfile.mkdtemp() + factory = SimpleFileStoreFactory(self.path).add + self.state = Status(factory) + self.store = Store(self.state, MockIndexStore()) diff --git a/tests/test_entry.py b/tests/test_entry.py new file mode 100644 index 0000000..2c16ca5 --- /dev/null +++ b/tests/test_entry.py @@ -0,0 +1,65 @@ +# standard imports +import os +import logging +import unittest + +# external imports +from hexathon import add_0x + +# local imports +from chainqueue import QueueEntry + +# test imports +from tests.base_shep import TestShepBase + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + + +class MockTranslator: + pass + + +class TestShep(TestShepBase): + + def test_entry_get(self): + tx_hash_one = add_0x(os.urandom(20).hex()) + signed_tx = add_0x(os.urandom(128).hex()) + nonce = 42 + entry = QueueEntry(self.store, tx_hash_one) + entry.create(nonce, signed_tx) + + tx_hash_two = add_0x(os.urandom(20).hex()) + signed_tx = add_0x(os.urandom(128).hex()) + nonce = 42 + entry = QueueEntry(self.store, tx_hash_two) + entry.create(nonce, signed_tx) + + txs = self.store.list() + self.assertEqual(len(txs), 2) + + entry = QueueEntry(self.store, tx_hash_one) + entry.load() + entry.sent() + + txs = self.store.list() + self.assertEqual(len(txs), 1) + + txs = self.store.list(state=self.store.IN_NETWORK) + self.assertEqual(len(txs), 1) + + entry.succeed(0) + txs = self.store.list() + self.assertEqual(len(txs), 1) + + entry = QueueEntry(self.store, tx_hash_two) + entry.load() + entry.sent() + + txs = self.store.list(state=self.store.IN_NETWORK) + self.assertEqual(len(txs), 2) + + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_shep.py b/tests/test_shep.py index 796b9f2..fba1b57 100644 --- a/tests/test_shep.py +++ b/tests/test_shep.py @@ -2,43 +2,21 @@ import os import logging import unittest -import tempfile # external imports from hexathon import add_0x -from shep.store.file import SimpleFileStoreFactory from shep.error import StateTransitionInvalid # local imports -from chainqueue import Status -from chainqueue.tx import Tx +from chainqueue import QueueEntry + +# test imports +from tests.base_shep import TestShepBase logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() -class MockIndexStore: - - def __init__(self): - self.store = {} - - - def put(self, k, v): - self.store[k] = v - - - def get(self, k): - return self.store.get(k) - - -class TestShepBase(unittest.TestCase): - - def setUp(self): - self.path = tempfile.mkdtemp() - factory = SimpleFileStoreFactory(self.path).add - self.state = Status(factory) - - class TestShep(TestShepBase): def test_shep_setup(self): @@ -49,11 +27,10 @@ class TestShep(TestShepBase): tx_hash = add_0x(os.urandom(20).hex()) signed_tx = add_0x(os.urandom(128).hex()) nonce = 42 - mock_store = MockIndexStore() - tx = Tx(self.state, mock_store, tx_hash) + tx = QueueEntry(self.store, tx_hash) tx.create(nonce, signed_tx) - tx_retrieved = Tx(self.state, mock_store, tx_hash) + tx_retrieved = QueueEntry(self.store, tx_hash) tx_retrieved.load() self.assertEqual(tx_retrieved.signed_tx, signed_tx)