7 Commits

Author SHA1 Message Date
lash
4f96be2024 Upgrade chainqueue 2022-05-03 17:22:59 +00:00
lash
32e1bc6aa5 Correct purge call, add missing lock module 2022-05-02 20:10:30 +00:00
lash
387014f77b Purge items from memory state on final 2022-05-02 20:05:41 +00:00
lash
c3a592c0f6 Implement shep store lock 2022-05-02 09:59:13 +00:00
lash
5d61506133 WIP whackamole race condition problems 2022-05-01 07:55:51 +00:00
lash
5102b4ac6e Fix crashes related to race condition hits 2022-05-01 07:31:18 +00:00
lash
9b98703f24 Receive race handling from chainqueue, rehabilitate tests 2022-05-01 06:58:52 +00:00
11 changed files with 180 additions and 45 deletions

View File

@@ -1,3 +1,10 @@
- 0.2.2
* Fix missing symbol crashes related to race conditions
- 0.2.1
* Receive removed race checks from chainqueue
- 0.2.0
* primitive race condition handling between fs access of sync and queue
* re-enable throttling based on in-flight transaction count
- 0.1.2
* add settings object
- 0.1.0

View File

@@ -1,10 +1,27 @@
# standard imports
import logging
import time
# external imports
from chainqueue import Store as QueueStore
# local imports
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class ChaindAdapter:
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
self.cache_adapter = cache_adapter
self.dispatcher = dispatcher
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
store_lock = StoreLock()
while True:
try:
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
break
except FileNotFoundError as e:
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(e))
store_lock.again()
continue

View File

@@ -1,6 +1,7 @@
# standard imports
import logging
import os
import time
# external imports
from chainlib.error import RPCException
@@ -10,12 +11,15 @@ from chainqueue.store.fs import (
IndexStore,
CounterStore,
)
from chainqueue.error import BackendIntegrityError
from shep.store.file import SimpleFileStoreFactory
from shep.error import StateInvalid
from shep.error import (
StateInvalid,
StateLockedKey,
)
# local imports
from .base import ChaindAdapter
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
@@ -23,7 +27,7 @@ logg = logging.getLogger(__name__)
class ChaindFsAdapter(ChaindAdapter):
def __init__(self, chain_spec, path, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0, digest_bytes=32, event_callback=None):
factory = SimpleFileStoreFactory(path).add
factory = SimpleFileStoreFactory(path, use_lock=True).add
state_store = Status(factory, allow_invalid=True, event_callback=event_callback)
index_path = os.path.join(path, 'tx')
index_store = IndexStore(index_path, digest_bytes=digest_bytes)
@@ -38,21 +42,29 @@ class ChaindFsAdapter(ChaindAdapter):
def get(self, tx_hash):
v = None
try:
v = self.store.get(tx_hash)
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError:
pass
if v ==None:
raise BackendIntegrityError(tx_hash)
store_lock = StoreLock()
while True:
try:
v = self.store.get(tx_hash)
break
except StateInvalid as e:
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
return None
except FileNotFoundError as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
store_lock.again()
continue
return v[1]
def upcoming(self, limit=0):
real_limit = 0
in_flight = 0
in_flight = []
if limit > 0:
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
real_limit = limit - len(in_flight)
@@ -78,12 +90,19 @@ class ChaindFsAdapter(ChaindAdapter):
def succeed(self, block, tx):
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
return self.store.final(tx.hash, block, tx, error=False)
r = self.store.final(tx.hash, block, tx, error=False)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def fail(self, block, tx):
return self.store.final(tx.hash, block, tx, error=True)
if self.store.is_reserved(tx.hash):
raise QueueLockError(tx.hash)
r = self.store.final(tx.hash, block, tx, error=True)
(k, v) = self.store.get(tx.hash)
self.store.purge(k)
return r
def sendfail(self):
@@ -95,7 +114,22 @@ class ChaindFsAdapter(ChaindAdapter):
def dispatch(self, tx_hash):
entry = self.store.send_start(tx_hash)
entry = None
store_lock = StoreLock()
while True:
try:
entry = self.store.send_start(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again()
continue
tx_wire = entry.serialize()
r = None
@@ -105,5 +139,18 @@ class ChaindFsAdapter(ChaindAdapter):
self.store.fail(tx_hash)
return False
self.store.send_end(tx_hash)
store_lock = StoreLock()
while True:
try:
self.store.send_end(tx_hash)
break
except FileNotFoundError as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
except StateLockedKey as e:
logg.debug('dispatch failed to find {} in backend, will try again: {}'.format(tx_hash, e))
store_lock.again(e)
continue
return True

View File

@@ -20,3 +20,7 @@ class ClientInputError(ValueError):
class QueueLockError(Exception):
pass
class BackendError(Exception):
pass

View File

@@ -5,22 +5,22 @@ import time
# external imports
from chainlib.status import Status as TxStatus
from chainsyncer.filter import SyncFilter
from chainqueue.error import (
NotLocalTxError,
BackendIntegrityError,
)
from chainqueue.error import NotLocalTxError
from chaind.adapters.fs import ChaindFsAdapter
from shep.error import StateLockedKey
# local imports
from .error import QueueLockError
from .error import (
QueueLockError,
BackendError,
)
from chaind.lock import StoreLock
logg = logging.getLogger(__name__)
class StateFilter(SyncFilter):
delay_limit = 3.0
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
self.chain_spec = chain_spec
self.adapter_path = adapter_path
@@ -30,8 +30,9 @@ class StateFilter(SyncFilter):
def filter(self, conn, block, tx, session=None):
cache_tx = None
for i in range(3):
queue_adapter = None
store_lock = StoreLock()
queue_adapter = None
while True:
try:
queue_adapter = ChaindFsAdapter(
self.chain_spec,
@@ -39,28 +40,31 @@ class StateFilter(SyncFilter):
self.tx_adapter,
None,
)
except BackendIntegrityError as e:
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
store_lock.again()
continue
store_lock.reset()
try:
cache_tx = queue_adapter.get(tx.hash)
break
except NotLocalTxError:
logg.debug('skipping not local transaction {}'.format(tx.hash))
return False
except BackendIntegrityError as e:
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
queue_adapter = None
store_lock.again()
continue
break
if cache_tx == None:
raise NotLocalTxError(tx.hash)
delay = 0.01
store_lock = StoreLock()
queue_lock = StoreLock(error=QueueLockError)
while True:
if delay > self.delay_limit:
raise QueueLockError('The queue lock for tx {} seems to be stuck. Human meddling needed.'.format(tx.hash))
try:
if tx.status == TxStatus.SUCCESS:
queue_adapter.succeed(block, tx)
@@ -69,8 +73,21 @@ class StateFilter(SyncFilter):
break
except QueueLockError as e:
logg.debug('queue item {} is blocked, will retry: {}'.format(tx.hash, e))
time.sleep(delay)
delay *= 2
queue_lock.again()
except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again()
continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block))
if self.throttler != None:
self.throttler.dec(tx.hash)

34
chaind/lock.py Normal file
View File

@@ -0,0 +1,34 @@
# standard imports
import time
# local imports
from .error import BackendError
BASE_DELAY = 0.01
BASE_DELAY_LIMIT = 3.0
class StoreLock:
def __init__(self, delay=BASE_DELAY, delay_limit=BASE_DELAY_LIMIT, error=BackendError, description=None):
self.base_delay = delay
self.delay = delay
self.delay_limit = delay_limit
self.error = error
self.description = description
def again(self, e=None):
if self.delay > self.delay_limit:
err = None
if e != None:
err = str(e)
else:
err = self.description
raise self.error(err)
time.sleep(self.delay)
self.delay *= 2
def reset(self):
self.delay = self.base_delay

View File

@@ -8,12 +8,14 @@ import stat
from hexathon import strip_0x
# local imports
from chaind.error import (
from .error import (
NothingToDoError,
ClientGoneError,
ClientBlockError,
ClientInputError,
)
from .lock import StoreLock
from .error import BackendError
logg = logging.getLogger(__name__)
@@ -59,7 +61,16 @@ class SessionController:
def process(self, conn):
r = self.processor(self.chain_spec, self.adapter, conn)
state_lock = StoreLock()
r = None
while True:
try:
r = self.processor(self.chain_spec, self.adapter, conn)
break
except BackendError as e:
state_lock.again(e)
continue
if r > 0:
self.srv.settimeout(0.1)
else:

View File

@@ -29,8 +29,6 @@ class MockDispatcher:
def send(self, v):
import sys
sys.stderr.write('susu v {} {}\n'.format(v, self.fails))
if v in self.fails:
raise RPCException('{} is in fails'.format(v))
pass

View File

@@ -1,5 +1,5 @@
chainlib~=0.1.1
chainqueue~=0.1.6
chainqueue~=0.1.9
chainsyncer~=0.4.3
confini~=0.6.0
funga~=0.5.2

View File

@@ -1,6 +1,6 @@
[metadata]
name = chaind
version = 0.2.0
version = 0.2.3
description = Base package for chain queue service
author = Louis Holbrook
author_email = dev@holbrook.no

View File

@@ -74,7 +74,7 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh)
fltr.filter(None, None, tx)
@@ -85,7 +85,7 @@ class TestChaindFs(TestChaindFsBase):
data = os.urandom(128).hex()
hsh = self.adapter.put(data)
fltr = StateFilter(self.adapter)
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx)