From e87ec0cd4c18a4b196988ec76cdf0bc585545bf3 Mon Sep 17 00:00:00 2001 From: lash Date: Fri, 29 Apr 2022 06:26:43 +0000 Subject: [PATCH] Upgrade chainqueue, chainsyncer --- chaind/adapters/fs.py | 3 +++ chaind/error.py | 4 ++++ chaind/filter.py | 25 +++++++++++++++++++++---- requirements.txt | 4 ++-- setup.cfg | 2 +- 5 files changed, 31 insertions(+), 7 deletions(-) diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py index 4804639..d9e8722 100644 --- a/chaind/adapters/fs.py +++ b/chaind/adapters/fs.py @@ -58,6 +58,9 @@ class ChaindFsAdapter(ChaindAdapter): def succeed(self, block, tx): + if self.store.is_reserved(tx.hash): + raise QueueLockError(tx.hash) + return self.store.final(tx.hash, block, tx, error=False) diff --git a/chaind/error.py b/chaind/error.py index bf62e83..5561014 100644 --- a/chaind/error.py +++ b/chaind/error.py @@ -16,3 +16,7 @@ class ClientBlockError(BlockingIOError): class ClientInputError(ValueError): pass + + +class QueueLockError(Exception): + pass diff --git a/chaind/filter.py b/chaind/filter.py index dd45408..4140517 100644 --- a/chaind/filter.py +++ b/chaind/filter.py @@ -1,15 +1,21 @@ # standard imports import logging +import time # external imports from chainlib.status import Status as TxStatus from chainsyncer.filter import SyncFilter from chainqueue.error import NotLocalTxError +# local imports +from .error import QueueLockError + logg = logging.getLogger(__name__) class StateFilter(SyncFilter): + delay_limit = 3.0 + def __init__(self, adapter, throttler=None): self.adapter = adapter self.throttler = throttler @@ -22,10 +28,21 @@ class StateFilter(SyncFilter): logg.debug('skipping not local transaction {}'.format(tx.hash)) return False - if tx.status == TxStatus.SUCCESS: - self.adapter.succeed(block, tx) - else: - self.adapter.fail(block, tx) + delay = 0.01 + while True: + if delay > self.delay_limit: + raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash)) + try: + if tx.status == TxStatus.SUCCESS: + self.adapter.succeed(block, tx) + else: + self.adapter.fail(block, tx) + break + except QueueLockError as e: + logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e)) + time.sleep(delay) + delay *= 2 + if self.throttler != None: self.throttler.dec(tx.hash) diff --git a/requirements.txt b/requirements.txt index 57e2761..3e94f77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ chainlib~=0.1.1 -chainqueue~=0.1.2 -chainsyncer~=0.4.0 +chainqueue~=0.1.5 +chainsyncer~=0.4.2 confini~=0.6.0 funga~=0.5.2 pyxdg~=0.26 diff --git a/setup.cfg b/setup.cfg index e09b249..23d061a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind -version = 0.1.2 +version = 0.1.3 description = Base package for chain queue service author = Louis Holbrook author_email = dev@holbrook.no