Fix crashes related to race condition hits
This commit is contained in:
parent
9b98703f24
commit
5102b4ac6e
@ -1,3 +1,7 @@
|
||||
- 0.2.2
|
||||
* Fix missing symbol crashes related to race conditions
|
||||
- 0.2.1
|
||||
* Receive removed race checks from chainqueue
|
||||
- 0.2.0
|
||||
* primitive race condition handling between fs access of sync and queue
|
||||
* re-enable throttling based on in-flight transaction count
|
||||
|
@ -1,9 +1,14 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainqueue import Store as QueueStore
|
||||
|
||||
# local imports
|
||||
from chaind.error import BackendIntegrityError
|
||||
|
||||
logg = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChaindAdapter:
|
||||
|
||||
|
@ -61,19 +61,40 @@ class StateFilter(SyncFilter):
|
||||
raise NotLocalTxError(tx.hash)
|
||||
|
||||
delay = 0.01
|
||||
race_attempts = 0
|
||||
err = None
|
||||
while True:
|
||||
if delay > self.delay_limit:
|
||||
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
|
||||
elif race_attempts >= 3:
|
||||
break
|
||||
try:
|
||||
if tx.status == TxStatus.SUCCESS:
|
||||
queue_adapter.succeed(block, tx)
|
||||
else:
|
||||
queue_adapter.fail(block, tx)
|
||||
break
|
||||
err = None
|
||||
except QueueLockError as e:
|
||||
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
|
||||
time.sleep(delay)
|
||||
delay *= 2
|
||||
err = None
|
||||
except FileNotFoundError as e:
|
||||
err = e
|
||||
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
|
||||
race_attempts += 1
|
||||
continue
|
||||
except NotLocalTxError as e:
|
||||
err = e
|
||||
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
|
||||
race_attempts += 1
|
||||
continue
|
||||
|
||||
if err != None:
|
||||
raise BackendIntegrityError('cannot find queue item {} in backend: {}'.format(tx.hash, err))
|
||||
|
||||
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))
|
||||
|
||||
if self.throttler != None:
|
||||
self.throttler.dec(tx.hash)
|
||||
|
Loading…
Reference in New Issue
Block a user