Receive race handling from chainqueue, rehabilitate tests
This commit is contained in:
parent
4be5325df2
commit
9b98703f24
@ -1,3 +1,6 @@
|
|||||||
|
- 0.2.0
|
||||||
|
* primitive race condition handling between fs access of sync and queue
|
||||||
|
* re-enable throttling based on in-flight transaction count
|
||||||
- 0.1.2
|
- 0.1.2
|
||||||
* add settings object
|
* add settings object
|
||||||
- 0.1.0
|
- 0.1.0
|
||||||
|
@ -1,10 +1,28 @@
|
|||||||
# external imports
|
# external imports
|
||||||
from chainqueue import Store as QueueStore
|
from chainqueue import Store as QueueStore
|
||||||
|
|
||||||
|
# local imports
|
||||||
|
from chaind.error import BackendIntegrityError
|
||||||
|
|
||||||
|
|
||||||
class ChaindAdapter:
|
class ChaindAdapter:
|
||||||
|
|
||||||
|
race_delay = 0.1
|
||||||
|
|
||||||
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
|
def __init__(self, chain_spec, state_store, index_store, counter_store, cache_adapter, dispatcher, cache=None, pending_retry_threshold=0, error_retry_threshold=0):
|
||||||
self.cache_adapter = cache_adapter
|
self.cache_adapter = cache_adapter
|
||||||
self.dispatcher = dispatcher
|
self.dispatcher = dispatcher
|
||||||
|
err = None
|
||||||
|
for i in range(3):
|
||||||
|
try:
|
||||||
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
|
self.store = QueueStore(chain_spec, state_store, index_store, counter_store, cache=cache)
|
||||||
|
err = None
|
||||||
|
break
|
||||||
|
except FileNotFoundError as e:
|
||||||
|
logg.debug('queuestore instantiation failed, possible race condition (will try again): {}'.format(tx_hash, e))
|
||||||
|
err = e
|
||||||
|
time.sleep(self.race_delay)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if err != None:
|
||||||
|
raise BackendIntegrityError(err)
|
||||||
|
@ -10,12 +10,12 @@ from chainqueue.store.fs import (
|
|||||||
IndexStore,
|
IndexStore,
|
||||||
CounterStore,
|
CounterStore,
|
||||||
)
|
)
|
||||||
from chainqueue.error import BackendIntegrityError
|
|
||||||
from shep.store.file import SimpleFileStoreFactory
|
from shep.store.file import SimpleFileStoreFactory
|
||||||
from shep.error import StateInvalid
|
from shep.error import StateInvalid
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from .base import ChaindAdapter
|
from .base import ChaindAdapter
|
||||||
|
from chaind.error import BackendIntegrityError
|
||||||
|
|
||||||
logg = logging.getLogger(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -38,13 +38,19 @@ class ChaindFsAdapter(ChaindAdapter):
|
|||||||
|
|
||||||
def get(self, tx_hash):
|
def get(self, tx_hash):
|
||||||
v = None
|
v = None
|
||||||
|
err = None
|
||||||
|
for i in range(3):
|
||||||
try:
|
try:
|
||||||
v = self.store.get(tx_hash)
|
v = self.store.get(tx_hash)
|
||||||
|
err = None
|
||||||
except StateInvalid as e:
|
except StateInvalid as e:
|
||||||
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
|
logg.error('I am just a simple syncer and do not know how to handle the state which the tx {} is in: {}'.format(tx_hash, e))
|
||||||
return None
|
return None
|
||||||
except FileNotFoundError:
|
except FileNotFoundError as e:
|
||||||
pass
|
err = e
|
||||||
|
time.sleep(self.race_delay)
|
||||||
|
logg.debug('queuestore get {} failed, possible race condition (will try again): {}'.format(tx_hash, e))
|
||||||
|
continue
|
||||||
if v ==None:
|
if v ==None:
|
||||||
raise BackendIntegrityError(tx_hash)
|
raise BackendIntegrityError(tx_hash)
|
||||||
return v[1]
|
return v[1]
|
||||||
@ -52,7 +58,7 @@ class ChaindFsAdapter(ChaindAdapter):
|
|||||||
|
|
||||||
def upcoming(self, limit=0):
|
def upcoming(self, limit=0):
|
||||||
real_limit = 0
|
real_limit = 0
|
||||||
in_flight = 0
|
in_flight = []
|
||||||
if limit > 0:
|
if limit > 0:
|
||||||
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
|
in_flight = self.store.by_state(state=self.store.IN_NETWORK, not_state=self.store.FINAL)
|
||||||
real_limit = limit - len(in_flight)
|
real_limit = limit - len(in_flight)
|
||||||
|
@ -20,3 +20,7 @@ class ClientInputError(ValueError):
|
|||||||
|
|
||||||
class QueueLockError(Exception):
|
class QueueLockError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BackendIntegrityError(Exception):
|
||||||
|
pass
|
||||||
|
@ -5,14 +5,14 @@ import time
|
|||||||
# external imports
|
# external imports
|
||||||
from chainlib.status import Status as TxStatus
|
from chainlib.status import Status as TxStatus
|
||||||
from chainsyncer.filter import SyncFilter
|
from chainsyncer.filter import SyncFilter
|
||||||
from chainqueue.error import (
|
from chainqueue.error import NotLocalTxError
|
||||||
NotLocalTxError,
|
|
||||||
BackendIntegrityError,
|
|
||||||
)
|
|
||||||
from chaind.adapters.fs import ChaindFsAdapter
|
from chaind.adapters.fs import ChaindFsAdapter
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from .error import QueueLockError
|
from .error import (
|
||||||
|
QueueLockError,
|
||||||
|
BackendIntegrityError,
|
||||||
|
)
|
||||||
|
|
||||||
logg = logging.getLogger(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -20,6 +20,7 @@ logg = logging.getLogger(__name__)
|
|||||||
class StateFilter(SyncFilter):
|
class StateFilter(SyncFilter):
|
||||||
|
|
||||||
delay_limit = 3.0
|
delay_limit = 3.0
|
||||||
|
race_delay = 0.1
|
||||||
|
|
||||||
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
|
def __init__(self, chain_spec, adapter_path, tx_adapter, throttler=None):
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
@ -41,6 +42,7 @@ class StateFilter(SyncFilter):
|
|||||||
)
|
)
|
||||||
except BackendIntegrityError as e:
|
except BackendIntegrityError as e:
|
||||||
logg.error('adapter instantiation failed: {}, one more try'.format(e))
|
logg.error('adapter instantiation failed: {}, one more try'.format(e))
|
||||||
|
time.sleep(self.race_delay)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -50,6 +52,7 @@ class StateFilter(SyncFilter):
|
|||||||
return False
|
return False
|
||||||
except BackendIntegrityError as e:
|
except BackendIntegrityError as e:
|
||||||
logg.error('adapter instantiation failed: {}, one more try'.format(e))
|
logg.error('adapter instantiation failed: {}, one more try'.format(e))
|
||||||
|
time.sleep(self.race_delay)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
break
|
break
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
[metadata]
|
[metadata]
|
||||||
name = chaind
|
name = chaind
|
||||||
version = 0.2.0
|
version = 0.2.1
|
||||||
description = Base package for chain queue service
|
description = Base package for chain queue service
|
||||||
author = Louis Holbrook
|
author = Louis Holbrook
|
||||||
author_email = dev@holbrook.no
|
author_email = dev@holbrook.no
|
||||||
|
@ -74,7 +74,7 @@ class TestChaindFs(TestChaindFsBase):
|
|||||||
data = os.urandom(128).hex()
|
data = os.urandom(128).hex()
|
||||||
hsh = self.adapter.put(data)
|
hsh = self.adapter.put(data)
|
||||||
|
|
||||||
fltr = StateFilter(self.adapter)
|
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
|
||||||
tx = MockTx(hsh)
|
tx = MockTx(hsh)
|
||||||
fltr.filter(None, None, tx)
|
fltr.filter(None, None, tx)
|
||||||
|
|
||||||
@ -85,7 +85,7 @@ class TestChaindFs(TestChaindFsBase):
|
|||||||
data = os.urandom(128).hex()
|
data = os.urandom(128).hex()
|
||||||
hsh = self.adapter.put(data)
|
hsh = self.adapter.put(data)
|
||||||
|
|
||||||
fltr = StateFilter(self.adapter)
|
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
|
||||||
tx = MockTx(hsh, TxStatus.ERROR)
|
tx = MockTx(hsh, TxStatus.ERROR)
|
||||||
fltr.filter(None, None, tx)
|
fltr.filter(None, None, tx)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user