Implement shep store lock

This commit is contained in:
lash 2022-05-02 09:59:13 +00:00
parent 5d61506133
commit c3a592c0f6
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 78 additions and 67 deletions

View File

@ -6,29 +6,22 @@ import time
from chainqueue import Store as QueueStore from chainqueue import Store as QueueStore
# local imports # local imports
from chaind.error import BackendIntegrityError from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class ChaindAdapter: class ChaindAdapter:
race_delay = 0.1
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
self.cache_adapter = cache_adapter self.cache_adapter = cache_adapter
self.dispatcher = dispatcher self.dispatcher = dispatcher
err = None store_lock = StoreLock()
for i in range(3): while True:
try: try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
err = None
break break
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e)) logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
err = e store_lock.again()
time.sleep(self.race_delay)
continue continue
if err != None:
raise BackendIntegrityError(err)

View File

@ -12,11 +12,14 @@ from chainqueue.store.fs import (
CounterStore, CounterStore,
) )
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid from shep.error import (
StateInvalid,
StateLockedKey,
)
# local imports # local imports
from .base import ChaindAdapter from .base import ChaindAdapter
from chaind.error import BackendIntegrityError from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -24,7 +27,7 @@ logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter): class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None): def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None):
factory = SimpleFileStoreFactory(path).add factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory, allow_invalid=True, event_callback=event_callback) state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_path = os.path.join(path, 'tx') index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes) index_store = IndexStore(index_path, digest_bytes=digest_bytes)
@ -39,22 +42,23 @@ class ChaindFsAdapter(ChaindAdapter):
def get(self, tx_hash): def get(self, tx_hash):
v = None v = None
err = None store_lock = StoreLock()
for i in range(3): while True:
try: try:
v = self.store.get(tx_hash) v = self.store.get(tx_hash)
err = None
break break
except StateInvalid as e: except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None return None
except FileNotFoundError as e: except FileNotFoundError as e:
err = e
time.sleep(self.race_delay)
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e)) logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue continue
if err != None: except StateLockedKey as e:
raise BackendIntegrityError(tx_hash) logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
return v[1] return v[1]
@ -104,20 +108,20 @@ class ChaindFsAdapter(ChaindAdapter):
def dispatch(self, tx_hash): def dispatch(self, tx_hash):
entry = None entry = None
err = None
for i in range(3): store_lock = StoreLock()
while True:
try: try:
entry = self.store.send_start(tx_hash) entry = self.store.send_start(tx_hash)
err = None
break break
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, err)) logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
err = e store_lock.again()
time.sleep(self.race_delay) continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue continue
if err != None:
raise BackendIntegrityError('dispatch failed to find {} in backend: {}'.format(tx_hash, err))
tx_wire = entry.serialize() tx_wire = entry.serialize()
@ -128,5 +132,18 @@ class ChaindFsAdapter(ChaindAdapter):
self.store.fail(tx_hash) self.store.fail(tx_hash)
return False return False
self.store.send_end(tx_hash) store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
return True return True

View File

@ -22,5 +22,5 @@ class QueueLockError(Exception):
pass pass
class BackendIntegrityError(Exception): class BackendError(Exception):
pass pass

View File

@ -7,21 +7,20 @@ from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainqueue.error import NotLocalTxError from chainqueue.error import NotLocalTxError
from chaind.adapters.fs import ChaindFsAdapter from chaind.adapters.fs import ChaindFsAdapter
from shep.error import StateLockedKey
# local imports # local imports
from .error import ( from .error import (
QueueLockError, QueueLockError,
BackendIntegrityError, BackendError,
) )
from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class StateFilter(SyncFilter): class StateFilter(SyncFilter):
delay_limit = 3.0
race_delay = 0.1
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None): def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.adapter_path = adapter_path self.adapter_path = adapter_path
@ -31,8 +30,9 @@ class StateFilter(SyncFilter):
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
cache_tx = None cache_tx = None
for i in range(3): store_lock = StoreLock()
queue_adapter = None queue_adapter = None
while True:
try: try:
queue_adapter = ChaindFsAdapter( queue_adapter = ChaindFsAdapter(
self.chain_spec, self.chain_spec,
@ -40,62 +40,52 @@ class StateFilter(SyncFilter):
self.tx_adapter, self.tx_adapter,
None, None,
) )
except BackendIntegrityError as e: except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e)) logg.error('adapter instantiation failed: {}, one more try'.format(e))
time.sleep(self.race_delay) store_lock.again()
continue continue
store_lock.reset()
try: try:
cache_tx = queue_adapter.get(tx.hash) cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError: except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash)) logg.debug('skipping not local transaction {}'.format(tx.hash))
return False return False
except BackendIntegrityError as e: except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e)) logg.error('adapter instantiation failed: {}, one more try'.format(e))
time.sleep(self.race_delay) queue_adapter = None
store_lock.again()
continue continue
break
if cache_tx == None: if cache_tx == None:
raise NotLocalTxError(tx.hash) raise NotLocalTxError(tx.hash)
delay = 0.01 store_lock = StoreLock()
race_attempts = 0 queue_lock = StoreLock(error=QueueLockError)
err = None
while True: while True:
if delay > self.delay_limit:
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
elif race_attempts >= 3:
break
try: try:
if tx.status == TxStatus.SUCCESS: if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx) queue_adapter.succeed(block, tx)
else: else:
queue_adapter.fail(block, tx) queue_adapter.fail(block, tx)
err = None
break break
except QueueLockError as e: except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
time.sleep(delay) queue_lock.again()
delay *= 2
race_attempts = 0
err = None
except FileNotFoundError as e: except FileNotFoundError as e:
err = e
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
race_attempts += 1 store_lock.again()
time.sleep(self.race_delay)
continue continue
except NotLocalTxError as e: except NotLocalTxError as e:
err = e
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
race_attempts += 1 store_lock.again()
time.sleep(self.race_delay) continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue continue
if err != None:
raise BackendIntegrityError('cannot find queue item {} in backend: {}'.format(tx.hash, err))
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block)) logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))

View File

@ -8,12 +8,14 @@ import stat
from hexathon import strip_0x from hexathon import strip_0x
# local imports # local imports
from chaind.error import ( from .error import (
NothingToDoError, NothingToDoError,
ClientGoneError, ClientGoneError,
ClientBlockError, ClientBlockError,
ClientInputError, ClientInputError,
) )
from .lock import StoreLock
from .error import BackendError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -59,7 +61,16 @@ class SessionController:
def process(self, conn): def process(self, conn):
r = self.processor(self.chain_spec, self.adapter, conn) state_lock = StoreLock()
r = None
while True:
try:
r = self.processor(self.chain_spec, self.adapter, conn)
break
except BackendError as e:
state_lock.again(e)
continue
if r > 0: if r > 0:
self.srv.settimeout(0.1) self.srv.settimeout(0.1)
else: else: