7 Commits

Author SHA1 Message Date
lash
0a5818ebf1 Update settings for queue and syncer 2022-05-10 06:13:56 +00:00
lash
5d2d73fa64 Upgrade chainqueue, chainsyncer 2022-05-09 19:44:28 +00:00
lash
dd1879bb91 Implement mocks on generic tx, block 2022-05-09 19:25:31 +00:00
lash
e264ed5c37 Upgrade deps, to avoid shep sync on persist set 2022-05-05 17:08:09 +00:00
lash
d472bd4f7c Upgrade deps, handle filestore list exception in shep 2022-05-05 15:41:30 +00:00
lash
465d692956 Upgrade deps 2022-05-05 15:02:21 +00:00
lash
81c1207828 Recreate adapter only per block 2022-05-05 12:09:07 +00:00
7 changed files with 114 additions and 38 deletions

View File

@@ -1,3 +1,12 @@
- 0.2.12
* Breaking upgrade of chainlib.
* Implement generic block and tx.
- 0.2.11
* Upgrade shep to handle exception in filestore list
- 0.2.10
* Upgrade shep to guarantee state lock atomicity
- 0.2.9
* Minimize instantiations of adapters in filter execution
- 0.2.8 - 0.2.8
* Upgrade chainsyncer * Upgrade chainsyncer
- 0.2.7 - 0.2.7

View File

@@ -26,27 +26,51 @@ class StateFilter(SyncFilter):
self.adapter_path = adapter_path self.adapter_path = adapter_path
self.tx_adapter = tx_adapter self.tx_adapter = tx_adapter
self.throttler = throttler self.throttler = throttler
self.last_block_height = 0
self.adapter = None
self.store_lock = None
def __get_adapter(self, block, force_reload=False):
if self.store_lock == None:
self.store_lock = StoreLock()
reload = False
if block.number != self.last_block_height:
reload = True
elif self.adapter == None:
reload = True
elif force_reload:
reload = True
self.last_block_height = block.number
if reload:
while True:
logg.info('reloading adapter')
try:
self.adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
break
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
self.store_lock.again()
continue
return self.adapter
def filter(self, conn, block, tx, session=None): def filter(self, conn, block, tx, session=None):
cache_tx = None cache_tx = None
store_lock = StoreLock() queue_adapter = self.__get_adapter(block)
queue_adapter = None
self.store_lock.reset()
while True: while True:
try:
queue_adapter = ChaindFsAdapter(
self.chain_spec,
self.adapter_path,
self.tx_adapter,
None,
)
except BackendError as e:
logg.error('adapter instantiation failed: {}, one more try'.format(e))
store_lock.again()
continue
store_lock.reset()
try: try:
cache_tx = queue_adapter.get(tx.hash) cache_tx = queue_adapter.get(tx.hash)
break break
@@ -55,14 +79,15 @@ class StateFilter(SyncFilter):
return False return False
except BackendError as e: except BackendError as e:
logg.error('adapter get failed: {}, one more try'.format(e)) logg.error('adapter get failed: {}, one more try'.format(e))
queue_adapter = None self.store_lock.again()
store_lock.again() queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
if cache_tx == None: if cache_tx == None:
raise NotLocalTxError(tx.hash) raise NotLocalTxError(tx.hash)
store_lock = StoreLock() self.store_lock.reset()
queue_lock = StoreLock(error=QueueLockError) queue_lock = StoreLock(error=QueueLockError)
while True: while True:
try: try:
@@ -76,18 +101,21 @@ class StateFilter(SyncFilter):
queue_lock.again() queue_lock.again()
except FileNotFoundError as e: except FileNotFoundError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
except NotLocalTxError as e: except NotLocalTxError as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
except StateLockedKey as e: except StateLockedKey as e:
logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e)) logg.debug('queue item {} not found, possible race condition, will retry: {}'.format(tx.hash, e))
store_lock.again() self.store_lock.again()
queue_adapter = self.__get_adapter(block, force_reload=True)
continue continue
logg.info('filter registered {} for {} in {}'.format(tx.status.name, tx.hash, block)) logg.info('filter registered {} for {} in {}'.format(tx.status_name, tx.hash, block))
if self.throttler != None: if self.throttler != None:
self.throttler.dec(tx.hash) self.throttler.dec(tx.hash)

View File

@@ -4,6 +4,7 @@ import os
import uuid import uuid
# external imports # external imports
from chainlib.settings import ChainSettings
from chainsyncer.settings import ChainsyncerSettings from chainsyncer.settings import ChainsyncerSettings
from chainqueue.settings import ChainqueueSettings from chainqueue.settings import ChainqueueSettings
@@ -106,18 +107,39 @@ class ChaindSettings(ChainsyncerSettings, ChainqueueSettings):
raise ValueError('at least one backend must be set') raise ValueError('at least one backend must be set')
def process_chaind_queue(self, config):
if config.get('QUEUE_STATE_PATH') == None:
queue_state_dir = self.dir_for('queue')
config.add(queue_state_dir, 'QUEUE_STATE_PATH', False)
logg.debug('setting queue state path {}'.format(queue_state_dir))
self.process_queue_tx(config)
self.process_queue_paths(config)
if config.get('QUEUE_BACKEND') == 'fs':
self.process_queue_backend_fs(config)
self.process_queue_backend(config)
self.process_queue_store(config)
def process(self, config): def process(self, config):
super(ChaindSettings, self).process(config) #super(ChaindSettings, self).process(config)
if self.include_sync: self.process_common(config)
self.process_sync(config)
self.process_sync_backend(config)
if self.include_queue: if self.include_queue:
self.process_queue_backend(config) self.process_queue_backend(config)
self.process_dispatch(config) if self.include_sync:
self.process_token(config) self.process_sync_backend(config)
self.process_backend(config) self.process_backend(config)
self.process_session(config) self.process_session(config)
if self.include_sync:
self.process_sync(config)
if self.include_queue:
self.process_chaind_queue(config)
self.process_dispatch(config)
self.process_token(config)
self.process_socket(config) self.process_socket(config)

View File

@@ -7,6 +7,11 @@ from chainqueue.cache import CacheTokenTx
from chainlib.status import Status as TxStatus from chainlib.status import Status as TxStatus
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.error import RPCException from chainlib.error import RPCException
from chainlib.tx import (
Tx,
TxResult,
)
from chainlib.block import Block
class MockCacheAdapter(CacheTokenTx): class MockCacheAdapter(CacheTokenTx):
@@ -34,8 +39,17 @@ class MockDispatcher:
pass pass
class MockTx: class MockTx(Tx):
def __init__(self, tx_hash, status=TxStatus.SUCCESS): def __init__(self, tx_hash, status=TxStatus.SUCCESS):
self.hash = tx_hash result = TxResult()
self.status = status result.status = status
super(MockTx, self).__init__(result=result)
self.set_hash(tx_hash)
class MockBlock(Block):
def __init__(self, number):
super(MockBlock, self).__init__()
self.number = number

View File

@@ -1,6 +1,6 @@
chainlib~=0.1.2 chainlib~=0.2.0
chainqueue~=0.1.12 chainqueue~=0.1.16
chainsyncer~=0.4.4 chainsyncer~=0.4.9
confini~=0.6.0 confini~=0.6.0
funga~=0.5.2 funga~=0.5.2
pyxdg~=0.26 pyxdg~=0.26

View File

@@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind name = chaind
version = 0.2.8 version = 0.2.12
description = Base package for chain queue service description = Base package for chain queue service
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@@ -14,6 +14,7 @@ from chaind.filter import StateFilter
# test imports # test imports
from chaind.unittest.common import ( from chaind.unittest.common import (
MockTx, MockTx,
MockBlock,
MockCacheAdapter, MockCacheAdapter,
MockDispatcher, MockDispatcher,
) )
@@ -76,7 +77,8 @@ class TestChaindFs(TestChaindFsBase):
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh) tx = MockTx(hsh)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_fs_filter_fail(self): def test_fs_filter_fail(self):
@@ -87,7 +89,8 @@ class TestChaindFs(TestChaindFsBase):
fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter) fltr = StateFilter(self.chain_spec, self.path, MockCacheAdapter)
tx = MockTx(hsh, TxStatus.ERROR) tx = MockTx(hsh, TxStatus.ERROR)
fltr.filter(None, None, tx) block = MockBlock(42)
fltr.filter(None, block, tx)
def test_upcoming(self): def test_upcoming(self):