From 95930ef7de1b9cbfc145931788e8e647f1c94a66 Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 12 Apr 2022 13:44:29 +0000 Subject: [PATCH] Handle duplicate tx attempts --- chainqueue/enum.py | 1 - chainqueue/error.py | 5 +++++ chainqueue/store/base.py | 6 ++++-- chainqueue/store/fs.py | 14 ++++++++++++++ tests/test_store.py | 10 ++++++++++ 5 files changed, 33 insertions(+), 3 deletions(-) diff --git a/chainqueue/enum.py b/chainqueue/enum.py index 1ebc03f..35d67f1 100644 --- a/chainqueue/enum.py +++ b/chainqueue/enum.py @@ -42,7 +42,6 @@ class StatusEnum(enum.IntEnum): """ PENDING = 0 """Transaction has been added but no processing has been performed""" - SENDFAIL = StatusBits.DEFERRED | StatusBits.LOCAL_ERROR """Temporary error occurred when sending transaction to node""" RETRY = StatusBits.QUEUED | StatusBits.DEFERRED diff --git a/chainqueue/error.py b/chainqueue/error.py index c4b4d5e..518c7cf 100644 --- a/chainqueue/error.py +++ b/chainqueue/error.py @@ -29,3 +29,8 @@ class BackendIntegrityError(ChainQueueException): """ pass + +class DuplicateTxError(ChainQueueException): + """Backend already knows transaction + """ + pass diff --git a/chainqueue/store/base.py b/chainqueue/store/base.py index 5dc537d..f40e7ef 100644 --- a/chainqueue/store/base.py +++ b/chainqueue/store/base.py @@ -6,7 +6,9 @@ import logging # local imports from chainqueue.cache import CacheTx from chainqueue.entry import QueueEntry -from chainqueue.error import NotLocalTxError +from chainqueue.error import ( + NotLocalTxError, + ) logg = logging.getLogger(__name__) @@ -53,8 +55,8 @@ class Store: n = self.counter.next() t = datetime.datetime.now().timestamp() s = to_key(t, n, k) - self.state_store.put(s, v) self.index_store.put(k, s) + self.state_store.put(s, v) if self.cache != None: self.cache.put(self.chain_spec, tx) return (s, k,) diff --git a/chainqueue/store/fs.py b/chainqueue/store/fs.py index aa49a63..94b410d 100644 --- a/chainqueue/store/fs.py +++ b/chainqueue/store/fs.py @@ -5,6 +5,9 @@ import logging # external imports from leveldir.hex import HexDir +# local imports +from chainqueue.error import DuplicateTxError + logg = logging.getLogger(__name__) @@ -14,10 +17,21 @@ class IndexStore(HexDir): os.path.join(root_path, 'contents') self.store = HexDir(root_path, digest_bytes) + + def __exists(self, k): + existing = None + try: + existing = self.get(k) + except FileNotFoundError: + pass + return existing != None + def put(self, k, v): kb = bytes.fromhex(k) vb = v.encode('utf-8') + if self.__exists(k): + raise DuplicateTxError(k) self.store.add(kb, vb) diff --git a/tests/test_store.py b/tests/test_store.py index a352140..39c6db2 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -12,6 +12,7 @@ from chainqueue.store.fs import ( IndexStore, CounterStore, ) +from chainqueue.error import DuplicateTxError logging.basicConfig(level=logging.DEBUG) @@ -48,5 +49,14 @@ class TestStoreImplementations(unittest.TestCase): self.assertEqual(v, 2) + def test_duplicate(self): + store = IndexStore(self.path) + hx = os.urandom(32).hex() + data = 'foo_bar_baz' + store.put(hx, data) + with self.assertRaises(DuplicateTxError): + store.put(hx, data) + + if __name__ == '__main__': unittest.main()