diff --git a/CHANGELOG b/CHANGELOG index 2df0cb5..a4af308 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,5 @@ +- 0.2.9 + * Minimize instantiations of adapters in filter execution - 0.2.8 * Upgrade chainsyncer - 0.2.7 diff --git a/chaind/filter.py b/chaind/filter.py index ff4bed0..184fb1e 100644 --- a/chaind/filter.py +++ b/chaind/filter.py @@ -26,43 +26,69 @@ class StateFilter(SyncFilter): self.adapter_path = adapter_path self.tx_adapter = tx_adapter self.throttler = throttler + self.last_block_height = 0 + self.adapter = None + self.store_lock = None + + + def __get_adapter(self, block, force_reload=False): + if self.store_lock == None: + self.store_lock = StoreLock() + + reload = False + if block.number != self.last_block_height: + reload = True + elif self.adapter == None: + reload = True + elif force_reload: + reload = True + + self.last_block_height = block.number + + if reload: + while True: + logg.info('reloading adapter') + try: + self.adapter = ChaindFsAdapter( + self.chain_spec, + self.adapter_path, + self.tx_adapter, + None, + ) + break + except BackendError as e: + logg.error('adapter instantiation failed: {}, one more try'.format(e)) + self.store_lock.again() + continue + + return self.adapter def filter(self, conn, block, tx, session=None): cache_tx = None - store_lock = StoreLock() - queue_adapter = None + queue_adapter = self.__get_adapter(block) + + self.store_lock.reset() + while True: - try: - queue_adapter = ChaindFsAdapter( - self.chain_spec, - self.adapter_path, - self.tx_adapter, - None, - ) - except BackendError as e: - logg.error('adapter instantiation failed: {}, one more try'.format(e)) - store_lock.again() - continue - - store_lock.reset() - try: cache_tx = queue_adapter.get(tx.hash) break except NotLocalTxError: logg.debug('skipping not local transaction {}'.format(tx.hash)) + self.__stop_adapter() return False except BackendError as e: logg.error('adapter get failed: {}, one more try'.format(e)) - queue_adapter = None - store_lock.again() + self.store_lock.again() + queue_adapter = self.__get_adapter(block, force_reload=True) continue if cache_tx == None: raise NotLocalTxError(tx.hash) - store_lock = StoreLock() + self.store_lock.reset() + queue_lock = StoreLock(error=QueueLockError) while True: try: @@ -76,15 +102,18 @@ class StateFilter(SyncFilter): queue_lock.again() except FileNotFoundError as e: logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) - store_lock.again() + self.store_lock.again() + queue_adapter = self.__get_adapter(block, force_reload=True) continue except NotLocalTxError as e: logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) - store_lock.again() + self.store_lock.again() + queue_adapter = self.__get_adapter(block, force_reload=True) continue except StateLockedKey as e: logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) - store_lock.again() + self.store_lock.again() + queue_adapter = self.__get_adapter(block, force_reload=True) continue logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block)) diff --git a/setup.cfg b/setup.cfg index 9952003..4a62f63 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind -version = 0.2.8 +version = 0.2.9 description = Base package for chain queue service author = Louis Holbrook author_email = dev@holbrook.no