diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 6fb94784..834e1afe 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -1,3 +1,4 @@ +# standard imports import os import sys import logging @@ -5,22 +6,28 @@ import argparse import re import datetime -import web3 +# external imports import confini import celery from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec +from chainlib.eth.tx import unpack +from chainlib.connection import RPCConnection +from chainlib.eth.block import block_latest +from chainsyncer.driver import HeadSyncer +from chainsyncer.backend import MemBackend +# local imports from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase -from cic_eth.eth import RpcClient -from cic_eth.sync.retry import RetrySyncer from cic_eth.queue.tx import get_status_tx from cic_eth.queue.tx import get_tx from cic_eth.admin.ctrl import lock_send -from cic_eth.db.enum import StatusEnum -from cic_eth.db.enum import LockEnum -from cic_eth.eth.util import unpack_signed_raw_tx_hex +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + LockEnum, + ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -51,7 +58,6 @@ config.process() # override args args_override = { 'ETH_PROVIDER': getattr(args, 'p'), - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), } @@ -66,7 +72,7 @@ queue = args.q chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -RPCConnection.registry_location(args.p, chain_spec, tag='default') +RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') dsn = dsn_from_config(config) SessionBase.connect(dsn) @@ -178,53 +184,60 @@ def dispatch(conn, chain_spec): # s_send.apply_async() -class RetrySyncer(Syncer): +class FooFilter: - def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): + def __init__(self, chain_spec, queue='cic-eth'): + self.chain_spec = chain_spec + self.queue = queue + + + def filter(self, conn, block, tx, db_session=None): + s_send = celery.signature( + 'cic_eth.eth.resend_with_higher_gas', + [ + [tx], + self.chain_spec.asdict(), + ], + queue=self.queue, + ) + s_send.apply_async() + + +class RetrySyncer(HeadSyncer): + + def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): + backend = MemBackend(chain_spec, None) + super(RetrySyncer, self).__init__(backend) self.chain_spec = chain_spec if failed_grace_seconds == None: failed_grace_seconds = stalled_grace_seconds self.stalled_grace_seconds = stalled_grace_seconds self.failed_grace_seconds = failed_grace_seconds - self.final_func = final_func + self.batch_size = batch_size + self.conn = conn - def get(self): -# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) -# failed_txs = get_status_tx( -# StatusEnum.SENDFAIL.value, -# before=before, -# ) - before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) - stalled_txs = get_status_tx( - StatusBits.IN_NETWORK.value, - not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, - before=before, - ) - # return list(failed_txs.keys()) + list(stalled_txs.keys()) - return stalled_txs + def process(self, conn, block): + before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) + stalled_txs = get_status_tx( + StatusBits.IN_NETWORK.value, + not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, + before=before, + limit=self.batch_size, + ) + for tx in stalled_txs: + self.filter.apply(self.conn, block, tx) + self.backend.set(block.number + 1, 0) - def process(self, conn, ref): - logg.debug('tx {}'.format(ref)) - for f in self.filter: - f(conn, ref, None, str(self.chain_spec)) - - - - def loop(self, interval): - while self.running and Syncer.running_global: - rpc = RPCConnection.connect(self.chain_spec, 'default') - for tx in self.get(): - self.process(rpc, tx) - if self.final_func != None: - self.final_func(rpc, self.chain_spec) - time.sleep(interval) def main(): - - syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) - syncer.filter.append(sendfail_filter) - syncer.loop(float(straggler_delay)) + o = block_latest() + conn = RPCConnection.connect(chain_spec, 'default') + block = conn.do(o) + syncer = RetrySyncer(conn, chain_spec, straggler_delay) + syncer.backend.set(int(block, 16), 0) + syncer.add_filter(FooFilter(chain_spec, queue=queue)) + syncer.loop(float(straggler_delay), conn) if __name__ == '__main__': diff --git a/docker-compose.yml b/docker-compose.yml index 6fa6e8a1..5adf741f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -373,7 +373,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_retry.sh -v + ./start_retry.sh -vv # command: "/root/start_retry.sh -q cic-eth -vv"