diff --git a/CHANGELOG b/CHANGELOG index d098d2b..824b8aa 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +- 0.2.0 + * primitive race condition handling between fs access of sync and queue + * re-enable throttling based on in-flight transaction count - 0.1.2 * add settings object - 0.1.0 diff --git a/chaind/adapters/base.py b/chaind/adapters/base.py index 6e5315d..7e46bb0 100644 --- a/chaind/adapters/base.py +++ b/chaind/adapters/base.py @@ -1,10 +1,28 @@ # external imports from chainqueue import Store as QueueStore +# local imports +from chaind.error import BackendIntegrityError + class ChaindAdapter: + race_delay = 0.1 + def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): self.cache_adapter = cache_adapter self.dispatcher = dispatcher - self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) + err = None + for i in range(3): + try: + self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) + err = None + break + except FileNotFoundError as e: + logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(tx_hash, e)) + err = e + time.sleep(self.race_delay) + continue + + if err != None: + raise BackendIntegrityError(err) diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py index 65f59d0..3880f67 100644 --- a/chaind/adapters/fs.py +++ b/chaind/adapters/fs.py @@ -10,12 +10,12 @@ from chainqueue.store.fs import ( IndexStore, CounterStore, ) -from chainqueue.error import BackendIntegrityError from shep.store.file import SimpleFileStoreFactory from shep.error import StateInvalid # local imports from .base import ChaindAdapter +from chaind.error import BackendIntegrityError logg = logging.getLogger(__name__) @@ -38,13 +38,19 @@ class ChaindFsAdapter(ChaindAdapter): def get(self, tx_hash): v = None - try: - v = self.store.get(tx_hash) - except StateInvalid as e: - logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) - return None - except FileNotFoundError: - pass + err = None + for i in range(3): + try: + v = self.store.get(tx_hash) + err = None + except StateInvalid as e: + logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) + return None + except FileNotFoundError as e: + err = e + time.sleep(self.race_delay) + logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e)) + continue if v ==None: raise BackendIntegrityError(tx_hash) return v[1] @@ -52,7 +58,7 @@ class ChaindFsAdapter(ChaindAdapter): def upcoming(self, limit=0): real_limit = 0 - in_flight = 0 + in_flight = [] if limit > 0: in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL) real_limit = limit - len(in_flight) diff --git a/chaind/error.py b/chaind/error.py index 5561014..d3573da 100644 --- a/chaind/error.py +++ b/chaind/error.py @@ -20,3 +20,7 @@ class ClientInputError(ValueError): class QueueLockError(Exception): pass + + +class BackendIntegrityError(Exception): + pass diff --git a/chaind/filter.py b/chaind/filter.py index 07319d7..4c9c561 100644 --- a/chaind/filter.py +++ b/chaind/filter.py @@ -5,14 +5,14 @@ import time # external imports from chainlib.status import Status as TxStatus from chainsyncer.filter import SyncFilter -from chainqueue.error import ( - NotLocalTxError, - BackendIntegrityError, - ) +from chainqueue.error import NotLocalTxError from chaind.adapters.fs import ChaindFsAdapter # local imports -from .error import QueueLockError +from .error import ( + QueueLockError, + BackendIntegrityError, + ) logg = logging.getLogger(__name__) @@ -20,6 +20,7 @@ logg = logging.getLogger(__name__) class StateFilter(SyncFilter): delay_limit = 3.0 + race_delay = 0.1 def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None): self.chain_spec = chain_spec @@ -41,6 +42,7 @@ class StateFilter(SyncFilter): ) except BackendIntegrityError as e: logg.error('adapter instantiation failed: {}, one more try'.format(e)) + time.sleep(self.race_delay) continue try: @@ -50,6 +52,7 @@ class StateFilter(SyncFilter): return False except BackendIntegrityError as e: logg.error('adapter instantiation failed: {}, one more try'.format(e)) + time.sleep(self.race_delay) continue break diff --git a/setup.cfg b/setup.cfg index 0210228..cef23db 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind -version = 0.2.0 +version = 0.2.1 description = Base package for chain queue service author = Louis Holbrook author_email = dev@holbrook.no diff --git a/tests/test_fs.py b/tests/test_fs.py index 1fa250a..68792e6 100644 --- a/tests/test_fs.py +++ b/tests/test_fs.py @@ -74,7 +74,7 @@ class TestChaindFs(TestChaindFsBase): data = os.urandom(128).hex() hsh = self.adapter.put(data) - fltr = StateFilter(self.adapter) + fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) tx = MockTx(hsh) fltr.filter(None, None, tx) @@ -85,7 +85,7 @@ class TestChaindFs(TestChaindFsBase): data = os.urandom(128).hex() hsh = self.adapter.put(data) - fltr = StateFilter(self.adapter) + fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) tx = MockTx(hsh, TxStatus.ERROR) fltr.filter(None, None, tx)