diff --git a/chaind/adapters/fs.py b/chaind/adapters/fs.py index 918e0ae..65f59d0 100644 --- a/chaind/adapters/fs.py +++ b/chaind/adapters/fs.py @@ -10,6 +10,7 @@ from chainqueue.store.fs import ( IndexStore, CounterStore, ) +from chainqueue.error import BackendIntegrityError from shep.store.file import SimpleFileStoreFactory from shep.error import StateInvalid @@ -42,16 +43,24 @@ class ChaindFsAdapter(ChaindAdapter): except StateInvalid as e: logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) return None + except FileNotFoundError: + pass + if v ==None: + raise BackendIntegrityError(tx_hash) return v[1] def upcoming(self, limit=0): + real_limit = 0 + in_flight = 0 if limit > 0: - r = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL) - limit -= len(r) - if limit <= 0: + in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL) + real_limit = limit - len(in_flight) + if real_limit <= 0: return [] - return self.store.upcoming(limit=limit) + r = self.store.upcoming(limit=real_limit) + logg.info('upcoming returning {} upcoming from limit {} less {} active in-flight txs'.format(len(r), limit, len(in_flight))) + return r def pending(self): diff --git a/chaind/filter.py b/chaind/filter.py index 4140517..07319d7 100644 --- a/chaind/filter.py +++ b/chaind/filter.py @@ -5,28 +5,57 @@ import time # external imports from chainlib.status import Status as TxStatus from chainsyncer.filter import SyncFilter -from chainqueue.error import NotLocalTxError +from chainqueue.error import ( + NotLocalTxError, + BackendIntegrityError, + ) +from chaind.adapters.fs import ChaindFsAdapter # local imports from .error import QueueLockError logg = logging.getLogger(__name__) + class StateFilter(SyncFilter): delay_limit = 3.0 - def __init__(self, adapter, throttler=None): - self.adapter = adapter + def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None): + self.chain_spec = chain_spec + self.adapter_path = adapter_path + self.tx_adapter = tx_adapter self.throttler = throttler def filter(self, conn, block, tx, session=None): - try: - cache_tx = self.adapter.get(tx.hash) - except NotLocalTxError: - logg.debug('skipping not local transaction {}'.format(tx.hash)) - return False + cache_tx = None + for i in range(3): + queue_adapter = None + try: + queue_adapter = ChaindFsAdapter( + self.chain_spec, + self.adapter_path, + self.tx_adapter, + None, + ) + except BackendIntegrityError as e: + logg.error('adapter instantiation failed: {}, one more try'.format(e)) + continue + + try: + cache_tx = queue_adapter.get(tx.hash) + except NotLocalTxError: + logg.debug('skipping not local transaction {}'.format(tx.hash)) + return False + except BackendIntegrityError as e: + logg.error('adapter instantiation failed: {}, one more try'.format(e)) + continue + + break + + if cache_tx == None: + raise NotLocalTxError(tx.hash) delay = 0.01 while True: @@ -34,9 +63,9 @@ class StateFilter(SyncFilter): raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash)) try: if tx.status == TxStatus.SUCCESS: - self.adapter.succeed(block, tx) + queue_adapter.succeed(block, tx) else: - self.adapter.fail(block, tx) + queue_adapter.fail(block, tx) break except QueueLockError as e: logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e)) diff --git a/chaind/session.py b/chaind/session.py index ea9ad92..3d79f09 100644 --- a/chaind/session.py +++ b/chaind/session.py @@ -104,7 +104,6 @@ class SessionController: logg.error('invalid input "{}"'.format(data_in_str)) raise ClientInputError() - logg.info('recv {} bytes'.format(len(data))) return (srvs, data,)