5 Commits

Author SHA1 Message Date
lash
c2f55f073e Add missing lock file 2022-05-02 20:12:15 +00:00
lash
c3a592c0f6 Implement shep store lock 2022-05-02 09:59:13 +00:00
lash
5d61506133 WIP whackamole race condition problems 2022-05-01 07:55:51 +00:00
lash
5102b4ac6e Fix crashes related to race condition hits 2022-05-01 07:31:18 +00:00
lash
9b98703f24 Receive race handling from chainqueue, rehabilitate tests 2022-05-01 06:58:52 +00:00
10 changed files with 170 additions and 40 deletions

View File

@@ -1,3 +1,10 @@
- 0.2.2
* Fix missing symbol crashes related to race conditions
- 0.2.1
* Receive removed race checks from chainqueue
- 0.2.0
* primitive race condition handling between fs access of sync and queue
* re-enable throttling based on in-flight transaction count
- 0.1.2 - 0.1.2
* add settings object * add settings object
- 0.1.0 - 0.1.0

View File

@@ -1,10 +1,27 @@
# standard imports
import logging
import time
# external imports # external imports
from chainqueue import Store as QueueStore from chainqueue import Store as QueueStore
# local imports
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class ChaindAdapter: class ChaindAdapter:
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0): def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
self.cache_adapter = cache_adapter self.cache_adapter = cache_adapter
self.dispatcher = dispatcher self.dispatcher = dispatcher
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache) store_lock = StoreLock()
while True:
try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
break
except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
store_lock.again()
continue

View File

@@ -1,6 +1,7 @@
# standard imports # standard imports
import logging import logging
import os import os
import time
# external imports # external imports
from chainlib.error import RPCException from chainlib.error import RPCException
@@ -10,12 +11,15 @@ from chainqueue.store.fs import (
IndexStore, IndexStore,
CounterStore, CounterStore,
) )
from chainqueue.error import BackendIntegrityError
from shep.store.file import SimpleFileStoreFactory from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid from shep.error import (
StateInvalid,
StateLockedKey,
)
# local imports # local imports
from .base import ChaindAdapter from .base import ChaindAdapter
from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@@ -23,7 +27,7 @@ logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter): class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None): def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None):
factory = SimpleFileStoreFactory(path).add factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory, allow_invalid=True, event_callback=event_callback) state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_path = os.path.join(path, 'tx') index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes) index_store = IndexStore(index_path, digest_bytes=digest_bytes)
@@ -38,21 +42,29 @@ class ChaindFsAdapter(ChaindAdapter):
def get(self, tx_hash): def get(self, tx_hash):
v = None v = None
try: store_lock = StoreLock()
v = self.store.get(tx_hash) while True:
except StateInvalid as e: try:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e)) v = self.store.get(tx_hash)
return None break
except FileNotFoundError: except StateInvalid as e:
pass logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
if v ==None: return None
raise BackendIntegrityError(tx_hash) except FileNotFoundError as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
return v[1] return v[1]
def upcoming(self, limit=0): def upcoming(self, limit=0):
real_limit = 0 real_limit = 0
in_flight = 0 in_flight = []
if limit > 0: if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL) in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight) real_limit = limit - len(in_flight)
@@ -95,7 +107,22 @@ class ChaindFsAdapter(ChaindAdapter):
def dispatch(self, tx_hash): def dispatch(self, tx_hash):
entry = self.store.send_start(tx_hash) entry = None
store_lock = StoreLock()
while True:
try:
entry = self.store.send_start(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
tx_wire = entry.serialize() tx_wire = entry.serialize()
r = None r = None
@@ -105,5 +132,18 @@ class ChaindFsAdapter(ChaindAdapter):
self.store.fail(tx_hash) self.store.fail(tx_hash)
return False return False
self.store.send_end(tx_hash) store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
return True return True

View File

@@ -20,3 +20,7 @@ class ClientInputError(ValueError):
class QueueLockError(Exception): class QueueLockError(Exception):
pass pass
class BackendError(Exception):
pass

View File

@@ -5,22 +5,22 @@ import time
# external imports # external imports
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainqueue.error import ( from chainqueue.error import NotLocalTxError
NotLocalTxError,
BackendIntegrityError,
)
from chaind.adapters.fs import ChaindFsAdapter from chaind.adapters.fs import ChaindFsAdapter
from shep.error import StateLockedKey
# local imports # local imports
from .error import QueueLockError from .error import (
QueueLockError,
BackendError,
)
from chaind.lock import StoreLock
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class StateFilter(SyncFilter): class StateFilter(SyncFilter):
delay_limit = 3.0
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None): def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.adapter_path = adapter_path self.adapter_path = adapter_path
@@ -30,8 +30,9 @@ class StateFilter(SyncFilter):
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
cache_tx = None cache_tx = None
for i in range(3): store_lock = StoreLock()
queue_adapter = None queue_adapter = None
while True:
try: try:
queue_adapter = ChaindFsAdapter( queue_adapter = ChaindFsAdapter(
self.chain_spec, self.chain_spec,
@@ -39,28 +40,31 @@ class StateFilter(SyncFilter):
self.tx_adapter, self.tx_adapter,
None, None,
) )
except BackendIntegrityError as e: except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e)) logg.error('adapter instantiation failed: {}, one more try'.format(e))
store_lock.again()
continue continue
store_lock.reset()
try: try:
cache_tx = queue_adapter.get(tx.hash) cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError: except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash)) logg.debug('skipping not local transaction {}'.format(tx.hash))
return False return False
except BackendIntegrityError as e: except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e)) logg.error('adapter instantiation failed: {}, one more try'.format(e))
queue_adapter = None
store_lock.again()
continue continue
break
if cache_tx == None: if cache_tx == None:
raise NotLocalTxError(tx.hash) raise NotLocalTxError(tx.hash)
delay = 0.01 store_lock = StoreLock()
queue_lock = StoreLock(error=QueueLockError)
while True: while True:
if delay > self.delay_limit:
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
try: try:
if tx.status == TxStatus.SUCCESS: if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx) queue_adapter.succeed(block, tx)
@@ -69,8 +73,21 @@ class StateFilter(SyncFilter):
break break
except QueueLockError as e: except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
time.sleep(delay) queue_lock.again()
delay *= 2 except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))
if self.throttler != None: if self.throttler != None:
self.throttler.dec(tx.hash) self.throttler.dec(tx.hash)

34
chaind/lock.py Normal file
View File

@@ -0,0 +1,34 @@
# standard imports
import time
# local imports
from .error import BackendError
BASE_DELAY = 0.01
BASE_DELAY_LIMIT = 3.0
class StoreLock:
def __init__(self, delay=BASE_DELAY, delay_limit=BASE_DELAY_LIMIT, error=BackendError, description=None):
self.base_delay = delay
self.delay = delay
self.delay_limit = delay_limit
self.error = error
self.description = description
def again(self, e=None):
if self.delay > self.delay_limit:
err = None
if e != None:
err = str(e)
else:
err = self.description
raise self.error(err)
time.sleep(self.delay)
self.delay *= 2
def reset(self):
self.delay = self.base_delay

View File

@@ -8,12 +8,14 @@ import stat
from hexathon import strip_0x from hexathon import strip_0x
# local imports # local imports
from chaind.error import ( from .error import (
NothingToDoError, NothingToDoError,
ClientGoneError, ClientGoneError,
ClientBlockError, ClientBlockError,
ClientInputError, ClientInputError,
) )
from .lock import StoreLock
from .error import BackendError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@@ -59,7 +61,16 @@ class SessionController:
def process(self, conn): def process(self, conn):
r = self.processor(self.chain_spec, self.adapter, conn) state_lock = StoreLock()
r = None
while True:
try:
r = self.processor(self.chain_spec, self.adapter, conn)
break
except BackendError as e:
state_lock.again(e)
continue
if r > 0: if r > 0:
self.srv.settimeout(0.1) self.srv.settimeout(0.1)
else: else:

View File

@@ -1,5 +1,5 @@
chainlib~=0.1.1 chainlib~=0.1.1
chainqueue~=0.1.6 chainqueue~=0.1.8
chainsyncer~=0.4.3 chainsyncer~=0.4.3
confini~=0.6.0 confini~=0.6.0
funga~=0.5.2 funga~=0.5.2

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.2.0 version = 0.2.2
description = Base package for chain queue service description = Base package for chain queue service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@@ -74,7 +74,7 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh) tx = MockTx(hsh)
fltr.filter(None, None, tx) fltr.filter(None, None, tx)
@@ -85,7 +85,7 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex() data = os.urandom(128).hex()
hsh = self.adapter.put(data) hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR) tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx) fltr.filter(None, None, tx)