Implement retry on failed sends

This commit is contained in:
nolash 2021-08-26 10:03:51 +02:00
parent 95f1f0d0a3
commit fc69b2762b
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 27 additions and 12 deletions

View File

@ -3,6 +3,7 @@ socket_path =
runtime_dir =
id =
data_dir =
dispatch_delay = 4.0
[database]
engine =

View File

@ -56,11 +56,13 @@ class Dispatcher:
logg.debug('too many inflight txs for {}, skipping {}'.format(sender, k))
continue
logg.debug('processing tx {} {}'.format(k, txs[k]))
r = 0
try:
self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
r = self.adapter.dispatch(self.chain_spec, rpc, k, txs[k], session)
except JSONRPCException as e:
logg.error('dispatch failed for {}: {}'.format(k, e))
continue
self.inc_count(sender, session)
c += 1
if r == 0:
self.inc_count(sender, session)
c += 1
return c

View File

@ -35,11 +35,13 @@ argparser = chainlib.eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--data-dir', type=str, help='data directory')
argparser.add_argument('--runtime-dir', type=str, help='runtime directory')
argparser.add_argument('--session-id', dest='session_id', type=str, help='session identifier')
argparser.add_argument('--dispatch-delay', dest='dispatch_delay', type=float, help='socket timeout before processing queue')
args = argparser.parse_args()
extra_args = {
'runtime_dir': 'SESSION_RUNTIME_DIR',
'data_dir': 'SESSION_DATA_DIR',
'session_id': 'SESSION_ID',
'dispatch_delay': 'SESSION_DISPATCH_DELAY',
}
#config = chainlib.eth.cli.Config.from_args(args, arg_flags, default_config_dir=config_dir, extend_base_config_dir=config_dir)
config = chainlib.eth.cli.Config.from_args(args, arg_flags, extra_args=extra_args, base_config_dir=config_dir)
@ -56,7 +58,8 @@ if not config.get('SESSION_SOCKET_PATH'):
config.add(socket_path, 'SESSION_SOCKET_PATH', True)
if config.get('DATABASE_ENGINE') == 'sqlite':
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
#config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded:\n{}'.format(config))
@ -83,7 +86,8 @@ class SessionController:
self.srv = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
self.srv.bind(config.get('SESSION_SOCKET_PATH'))
self.srv.listen(2)
self.srv.settimeout(4.0)
self.srv.settimeout(float(config.get('SESSION_DISPATCH_DELAY')))
def shutdown(self, signo, frame):
if self.dead:

View File

@ -64,7 +64,7 @@ if not config.get('SESSION_SOCKET_PATH'):
config.add(socket_path, 'SESSION_SOCKET_PATH', True)
if config.get('DATABASE_ENGINE') == 'sqlite':
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME') + '.sqlite'), 'DATABASE_NAME', exists_ok=True)
config.add(os.path.join(config.get('SESSION_DATA_DIR'), config.get('DATABASE_NAME')), 'DATABASE_NAME', exists_ok=True)
config.censor('PASSWORD', 'DATABASE')
logg.debug('config loaded:\n{}'.format(config))

View File

@ -1,5 +1,6 @@
# standard imports
import logging
import datetime
# external imports
from chainlib.eth.constant import ZERO_ADDRESS
@ -11,13 +12,15 @@ from hexathon import (
add_0x,
strip_0x,
)
from chainqueue.enum import StatusBits
from chainqueue.enum import (
StatusBits,
errors as queue_errors,
)
# local imports
from chainqueue.adapters.base import Adapter
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
logg = logging.getLogger(__name__)
class EthAdapter(Adapter):
@ -39,7 +42,12 @@ class EthAdapter(Adapter):
def upcoming(self, chain_spec, session=None):
return self.backend.get(chain_spec, StatusBits.QUEUED, self.translate) # possible maldesign, up-stack should use our session?
txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.QUEUED, not_status=StatusBits.IN_NETWORK)
before = datetime.datetime.utcnow() - self.error_retry_threshold
errored_txs = self.backend.get(chain_spec, self.translate, session=session, status=StatusBits.LOCAL_ERROR, not_status=StatusBits.FINAL, before=before, requeue=True)
for tx_hash in errored_txs.keys():
txs[tx_hash] = errored_txs[tx_hash]
return txs
def add(self, bytecode, chain_spec, session=None):

View File

@ -1,3 +1,3 @@
chaind<=0.0.1,>=0.0.1a8
chaind<=0.0.2,>=0.0.2a3
hexathon~=0.0.1a8
chainlib-eth<=0.1.0,>=0.0.9a4

View File

@ -1,6 +1,6 @@
[metadata]
name = chaind-eth
version = 0.0.1a6
version = 0.0.2a1
description = Queue server for ethereum
author = Louis Holbrook
author_email = dev@holbrook.no