From fc69b2762bf8c9f4686f32340437d180bf8a4e70 Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 26 Aug 2021 10:03:51 +0200 Subject: [PATCH] Implement retry on failed sends --- chaind_eth/data/config/config.ini | 1 + chaind_eth/dispatch.py | 8 +++++--- chaind_eth/runnable/server.py | 8 ++++++-- chaind_eth/runnable/syncer.py | 2 +- chainqueue/adapters/eth.py | 16 ++++++++++++---- requirements.txt | 2 +- setup.cfg | 2 +- 7 files changed, 27 insertions(+), 12 deletions(-) diff --git a/chaind_eth/data/config/config.ini b/chaind_eth/data/config/config.ini index d8ce0d0..749f003 100644 --- a/chaind_eth/data/config/config.ini +++ b/chaind_eth/data/config/config.ini @@ -3,6 +3,7 @@ socket_path = runtime_dir = id = data_dir = +dispatch_delay = 4.0 [database] engine = diff --git a/chaind_eth/dispatch.py b/chaind_eth/dispatch.py index 4e051d2..89f1420 100644 --- a/chaind_eth/dispatch.py +++ b/chaind_eth/dispatch.py @@ -56,11 +56,13 @@ class Dispatcher: logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k)) continue logg.debug('processing tx {} {}'.format(k, txs[k])) + r = 0 try: - self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session) + r = self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session) except JSONRPCException as e: logg.error('dispatch failed for {}: {}'.format(k, e)) continue - self.inc_count(sender, session) - c += 1 + if r == 0: + self.inc_count(sender, session) + c += 1 return c diff --git a/chaind_eth/runnable/server.py b/chaind_eth/runnable/server.py index e462e2b..08f43be 100644 --- a/chaind_eth/runnable/server.py +++ b/chaind_eth/runnable/server.py @@ -35,11 +35,13 @@ argparser = chainlib.eth.cli.ArgumentParser(arg_flags) argparser.add_argument('--data-dir', type=str, help='data directory') argparser.add_argument('--runtime-dir', type=str, help='runtime directory') argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier') +argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue') args = argparser.parse_args() extra_args = { 'runtime_dir': 'SESSION_RUNTIME_DIR', 'data_dir': 'SESSION_DATA_DIR', 'session_id': 'SESSION_ID', + 'dispatch_delay': 'SESSION_DISPATCH_DELAY', } #config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir) config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir) @@ -56,7 +58,8 @@ if not config.get('SESSION_SOCKET_PATH'): config.add(socket_path, 'SESSION_SOCKET_PATH', True) if config.get('DATABASE_ENGINE') == 'sqlite': - config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + #config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True) config.censor('PASSWORD', 'DATABASE') logg.debug('config loaded:\n{}'.format(config)) @@ -83,7 +86,8 @@ class SessionController: self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) self.srv.bind(config.get('SESSION_SOCKET_PATH')) self.srv.listen(2) - self.srv.settimeout(4.0) + self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY'))) + def shutdown(self, signo, frame): if self.dead: diff --git a/chaind_eth/runnable/syncer.py b/chaind_eth/runnable/syncer.py index 4ed12ff..2440cc6 100644 --- a/chaind_eth/runnable/syncer.py +++ b/chaind_eth/runnable/syncer.py @@ -64,7 +64,7 @@ if not config.get('SESSION_SOCKET_PATH'): config.add(socket_path, 'SESSION_SOCKET_PATH', True) if config.get('DATABASE_ENGINE') == 'sqlite': - config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True) + config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True) config.censor('PASSWORD', 'DATABASE') logg.debug('config loaded:\n{}'.format(config)) diff --git a/chainqueue/adapters/eth.py b/chainqueue/adapters/eth.py index 0ca5b02..118cf44 100644 --- a/chainqueue/adapters/eth.py +++ b/chainqueue/adapters/eth.py @@ -1,5 +1,6 @@ # standard imports import logging +import datetime # external imports from chainlib.eth.constant import ZERO_ADDRESS @@ -11,13 +12,15 @@ from hexathon import ( add_0x, strip_0x, ) -from chainqueue.enum import StatusBits +from chainqueue.enum import ( + StatusBits, + errors as queue_errors, + ) # local imports from chainqueue.adapters.base import Adapter -#logg = logging.getLogger(__name__) -logg = logging.getLogger() +logg = logging.getLogger(__name__) class EthAdapter(Adapter): @@ -39,7 +42,12 @@ class EthAdapter(Adapter): def upcoming(self, chain_spec, session=None): - return self.backend.get(chain_spec, StatusBits.QUEUED, self.translate) # possible maldesign, up-stack should use our session? + txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK) + before = datetime.datetime.utcnow() - self.error_retry_threshold + errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True) + for tx_hash in errored_txs.keys(): + txs[tx_hash] = errored_txs[tx_hash] + return txs def add(self, bytecode, chain_spec, session=None): diff --git a/requirements.txt b/requirements.txt index dcd3208..01fcbb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -chaind<=0.0.1,>=0.0.1a8 +chaind<=0.0.2,>=0.0.2a3 hexathon~=0.0.1a8 chainlib-eth<=0.1.0,>=0.0.9a4 diff --git a/setup.cfg b/setup.cfg index f40263d..dc3bc00 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = chaind-eth -version = 0.0.1a6 +version = 0.0.2a1 description = Queue server for ethereum author = Louis Holbrook author_email = dev@holbrook.no