Catch backend race

This commit is contained in:
lash 2022-04-30 18:44:52 +00:00
parent 097a6f4b53
commit 9522729fa0
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 23 additions and 14 deletions

View File

@ -67,10 +67,26 @@ settings.process(config)
logg.debug('settings:\n{}'.format(settings)) logg.debug('settings:\n{}'.format(settings))
def process_outgoing(chain_spec, adapter, rpc, limit=100): def process_outgoing(chain_spec, adapter, rpc, limit=50):
upcoming = adapter.upcoming() adapter = None
logg.info('process {} {} {}'.format(chain_spec, adapter, rpc)) process_err = None
logg.info('upcoming {}'.format(upcoming)) for i in range(2):
try:
adapter = ChaindFsAdapter(
settings.get('CHAIN_SPEC'),
settings.dir_for('queue'),
EthCacheTx,
dispatcher,
)
except BackendIntegrityError as e:
process_err = e
continue
if adapter == None:
raise BackendIntegrityError(process_err)
upcoming = adapter.upcoming(limit=limit)
logg.info('processor has {} candidates for {}, processing with limit {} adapter {} rpc {}'.format(len(upcoming), chain_spec, limit, adapter, rpc))
i = 0 i = 0
for tx_hash in upcoming: for tx_hash in upcoming:
if adapter.dispatch(tx_hash): if adapter.dispatch(tx_hash):

View File

@ -8,7 +8,6 @@ import chainsyncer.cli
import chaind.cli import chaind.cli
from chaind.setup import Environment from chaind.setup import Environment
from chaind.filter import StateFilter from chaind.filter import StateFilter
from chaind.adapters.fs import ChaindFsAdapter
from chainlib.eth.block import block_latest from chainlib.eth.block import block_latest
from hexathon import strip_0x from hexathon import strip_0x
from chainsyncer.store.fs import SyncFsStore from chainsyncer.store.fs import SyncFsStore
@ -59,13 +58,7 @@ logg.debug('settings:\n{}'.format(settings))
def main(): def main():
queue_adapter = ChaindFsAdapter( fltr = StateFilter(settings.get('CHAIN_SPEC'), settings.dir_for('queue'), EthCacheTx)
settings.get('CHAIN_SPEC'),
settings.dir_for('queue'),
EthCacheTx,
None,
)
fltr = StateFilter(queue_adapter)
sync_store = SyncFsStore(settings.get('SESSION_DATA_DIR'), session_id=settings.get('SESSION_ID')) sync_store = SyncFsStore(settings.get('SESSION_DATA_DIR'), session_id=settings.get('SESSION_ID'))
sync_store.register(fltr) sync_store.register(fltr)

View File

@ -1,4 +1,4 @@
chaind~=0.1.3 chaind~=0.2.0
hexathon~=0.1.5 hexathon~=0.1.5
chainlib-eth~=0.1.1 chainlib-eth~=0.1.1
pyxdg~=0.27 pyxdg~=0.27

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chaind-eth name = chaind-eth
version = 0.1.5 version = 0.2.0
description = Queue server for ethereum description = Queue server for ethereum
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no