From 810f9fe99409791a23152fa4d85649fd043db61e Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Fri, 2 Apr 2021 13:16:27 +0000 Subject: [PATCH] Rehabilitate retrier --- apps/cic-eth/cic_eth/api/api_admin.py | 19 ++- apps/cic-eth/cic_eth/eth/tx.py | 59 +++++--- apps/cic-eth/cic_eth/queue/tx.py | 13 +- .../cic-eth/cic_eth/runnable/daemons/retry.py | 140 ++++++++++++------ apps/cic-eth/cic_eth/runnable/resend.py | 40 +---- apps/cic-eth/tests/task/test_task_tx.py | 44 ++++++ .../scripts/import_users.py | 2 +- docker-compose.yml | 8 +- 8 files changed, 212 insertions(+), 113 deletions(-) diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index bcc6174e..458b03fb 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -134,7 +134,8 @@ class AdminApi: return s_have.apply_async() - def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False): + def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False): + logg.debug('resend {}'.format(tx_hash_hex)) s_get_tx_cache = celery.signature( 'cic_eth.queue.tx.get_tx_cache', @@ -156,7 +157,7 @@ class AdminApi: s = celery.signature( 'cic_eth.eth.tx.resend_with_higher_gas', [ - chain_str, + chain_spec.asdict(), None, 1.01, ], @@ -176,7 +177,7 @@ class AdminApi: s_gas = celery.signature( 'cic_eth.admin.ctrl.unlock_send', [ - chain_str, + chain_spec.asdict(), tx_dict['sender'], ], queue=self.queue, @@ -218,7 +219,7 @@ class AdminApi: blocking_tx = k blocking_nonce = nonce_otx elif nonce_otx - last_nonce > 1: - logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce)) + logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from'])) blocking_tx = k blocking_nonce = nonce_otx break @@ -312,10 +313,10 @@ class AdminApi: tx_dict = s.apply_async().get() if tx_dict['sender'] == address: if tx_dict['nonce'] - last_nonce > 1: - logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash'])) + logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash)) errors.append('nonce') elif tx_dict['nonce'] == last_nonce: - logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash'])) + logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash)) last_nonce = tx_dict['nonce'] if not include_sender: logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash'])) @@ -480,15 +481,17 @@ class AdminApi: tx['destination_token_symbol'] = destination_token.symbol() tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call() - tx['network_status'] = 'Not submitted' + # TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which + tx['network_status'] = 'Not in node' r = None try: o = transaction(tx_hash) r = self.rpc.do(o) + if r != None: + tx['network_status'] = 'Mempool' except Exception as e: logg.warning('(too permissive exception handler, please fix!) {}'.format(e)) - tx['network_status'] = 'Mempool' if r != None: try: diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index aef7d537..a930b847 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -7,7 +7,10 @@ import requests from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec from chainlib.eth.address import is_checksum_address -from chainlib.eth.gas import balance +from chainlib.eth.gas import ( + balance, + price, + ) from chainlib.eth.error import ( EthException, NotFoundEthException, @@ -17,11 +20,15 @@ from chainlib.eth.tx import ( receipt, raw, TxFormat, + TxFactory, unpack, ) from chainlib.connection import RPCConnection from chainlib.hash import keccak256_hex_to_hex -from chainlib.eth.gas import Gas +from chainlib.eth.gas import ( + Gas, + OverrideGasOracle, + ) from chainlib.eth.contract import ( abi_decode_single, ABIContractType, @@ -52,6 +59,7 @@ from cic_eth.queue.tx import ( get_tx, register_tx, get_nonce_tx, + create as queue_create, ) from cic_eth.error import OutOfGasError from cic_eth.error import LockedError @@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict): @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) -def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): +def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1): """Create a new transaction from an existing one with same nonce and higher gas price. :param txold_hash_hex: Transaction to re-create @@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa session.close() raise NotLocalTxError(txold_hash_hex) - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + chain_spec = ChainSpec.from_dict(chain_spec_dict) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx)) - queue = self.request.delivery_info['routing_key'] + queue = self.request.delivery_info.get('routing_key') logg.debug('before {}'.format(tx)) - if gas != None: - tx['gasPrice'] = gas - else: - gas_price = c.gas_price() - if tx['gasPrice'] > gas_price: - logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) + + rpc = RPCConnection.connect(chain_spec, 'default') + new_gas_price = gas + if new_gas_price == None: + o = price() + r = rpc.do(o) + current_gas_price = int(r, 16) + if tx['gasPrice'] > current_gas_price: + logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice'])) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor) - tx['gasPrice'] += 1 + new_gas_price = tx['gasPrice'] + 1 else: new_gas_price = int(tx['gasPrice'] * default_factor) - if gas_price > new_gas_price: - tx['gasPrice'] = gas_price - else: - tx['gasPrice'] = new_gas_price + #if gas_price > new_gas_price: + # tx['gasPrice'] = gas_price + #else: + # tx['gasPrice'] = new_gas_price - (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) + + rpc_signer = RPCConnection.connect(chain_spec, 'signer') + gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc) + + c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) + logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) + tx['gasPrice'] = new_gas_price + (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) queue_create( tx['nonce'], tx['from'], tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, session=session, ) TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) session.close() - s = create_check_gas_and_send_task( + s = create_check_gas_task( [tx_signed_raw_hex], - chain_str, + chain_spec, tx['from'], tx['gasPrice'] * tx['gas'], [tx_hash_hex], diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 6b228cc0..66b4e3a2 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se q = q.filter(Otx.status.op('&')(status)>0) if not_status != None: q = q.filter(Otx.status.op('&')(not_status)==0) + q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc()) i = 0 for o in q.all(): if limit > 0 and i == limit: @@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se # TODO: move query to model -def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None): +def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None): """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. Will omit addresses that have the LockEnum.SEND bit in Lock set. @@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch if status == StatusEnum.PENDING: q_outer = q_outer.filter(Otx.status==status.value) else: - q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value) + q_outer = q_outer.filter(Otx.status.op('&')(status)==status) + + if not_status != None: + q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0) if recipient != None: q_outer = q_outer.filter(TxCache.recipient==recipient) @@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch txs = {} + i = 0 for r in q_outer.all(): q = session.query(Otx) q = q.join(TxCache) @@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch session.add(o) session.commit() + i += 1 + if limit > 0 and limit == i: + break + SessionBase.release_session(session) return txs diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 6fb94784..b7889a32 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -1,3 +1,4 @@ +# standard imports import os import sys import logging @@ -5,22 +6,36 @@ import argparse import re import datetime -import web3 +# external imports import confini import celery from cic_eth_registry import CICRegistry from chainlib.chain import ChainSpec +from chainlib.eth.tx import unpack +from chainlib.connection import RPCConnection +from chainlib.eth.block import ( + block_latest, + block_by_number, + Block, + ) +from chainsyncer.driver import HeadSyncer +from chainsyncer.backend import MemBackend +from chainsyncer.error import NoBlockForYou +# local imports from cic_eth.db import dsn_from_config from cic_eth.db import SessionBase -from cic_eth.eth import RpcClient -from cic_eth.sync.retry import RetrySyncer -from cic_eth.queue.tx import get_status_tx -from cic_eth.queue.tx import get_tx +from cic_eth.queue.tx import ( + get_status_tx, + get_tx, +# get_upcoming_tx, + ) from cic_eth.admin.ctrl import lock_send -from cic_eth.db.enum import StatusEnum -from cic_eth.db.enum import LockEnum -from cic_eth.eth.util import unpack_signed_raw_tx_hex +from cic_eth.db.enum import ( + StatusEnum, + StatusBits, + LockEnum, + ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() @@ -31,7 +46,8 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') -argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') +argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration') +argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-v', help='be verbose', action='store_true') @@ -51,7 +67,6 @@ config.process() # override args args_override = { 'ETH_PROVIDER': getattr(args, 'p'), - 'ETH_ABI_DIR': getattr(args, 'abi_dir'), 'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), } @@ -59,6 +74,7 @@ config.dict_override(args_override, 'cli flag') config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +config.add(args.batch_size, '_BATCH_SIZE', True) app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) @@ -66,10 +82,10 @@ queue = args.q chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) -RPCConnection.registry_location(args.p, chain_spec, tag='default') +RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default') dsn = dsn_from_config(config) -SessionBase.connect(dsn) +SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) @@ -178,53 +194,85 @@ def dispatch(conn, chain_spec): # s_send.apply_async() -class RetrySyncer(Syncer): +class StragglerFilter: - def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): + def __init__(self, chain_spec, queue='cic-eth'): + self.chain_spec = chain_spec + self.queue = queue + + + def filter(self, conn, block, tx, db_session=None): + logg.debug('tx {}'.format(tx)) + s_send = celery.signature( + 'cic_eth.eth.tx.resend_with_higher_gas', + [ + tx, + self.chain_spec.asdict(), + ], + queue=self.queue, + ) + return s_send.apply_async() + #return s_send + + + def __str__(self): + return 'stragglerfilter' + + +class RetrySyncer(HeadSyncer): + + def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None): + backend = MemBackend(chain_spec, None) + super(RetrySyncer, self).__init__(backend) self.chain_spec = chain_spec if failed_grace_seconds == None: failed_grace_seconds = stalled_grace_seconds self.stalled_grace_seconds = stalled_grace_seconds self.failed_grace_seconds = failed_grace_seconds - self.final_func = final_func + self.batch_size = batch_size + self.conn = conn - def get(self): -# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) -# failed_txs = get_status_tx( -# StatusEnum.SENDFAIL.value, -# before=before, -# ) - before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) - stalled_txs = get_status_tx( - StatusBits.IN_NETWORK.value, - not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, - before=before, - ) - # return list(failed_txs.keys()) + list(stalled_txs.keys()) - return stalled_txs - - def process(self, conn, ref): - logg.debug('tx {}'.format(ref)) - for f in self.filter: - f(conn, ref, None, str(self.chain_spec)) + def get(self, conn): + o = block_latest() + r = conn.do(o) + (pair, flags) = self.backend.get() + n = int(r, 16) + if n == pair[0]: + raise NoBlockForYou('block {} already checked'.format(n)) + o = block_by_number(n) + r = conn.do(o) + b = Block(r) + return b + def process(self, conn, block): + before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) + stalled_txs = get_status_tx( + StatusBits.IN_NETWORK.value, + not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, + before=before, + limit=self.batch_size, + ) +# stalled_txs = get_upcoming_tx( +# status=StatusBits.IN_NETWORK.value, +# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, +# before=before, +# limit=self.batch_size, +# ) + for tx in stalled_txs: + self.filter.apply(self.conn, block, tx) + self.backend.set(block.number, 0) - def loop(self, interval): - while self.running and Syncer.running_global: - rpc = RPCConnection.connect(self.chain_spec, 'default') - for tx in self.get(): - self.process(rpc, tx) - if self.final_func != None: - self.final_func(rpc, self.chain_spec) - time.sleep(interval) def main(): - - syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) - syncer.filter.append(sendfail_filter) - syncer.loop(float(straggler_delay)) + #o = block_latest() + conn = RPCConnection.connect(chain_spec, 'default') + #block = conn.do(o) + syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE')) + syncer.backend.set(0, 0) + syncer.add_filter(StragglerFilter(chain_spec, queue=queue)) + syncer.loop(float(straggler_delay), conn) if __name__ == '__main__': diff --git a/apps/cic-eth/cic_eth/runnable/resend.py b/apps/cic-eth/cic_eth/runnable/resend.py index 77faf95a..7198b8ac 100644 --- a/apps/cic-eth/cic_eth/runnable/resend.py +++ b/apps/cic-eth/cic_eth/runnable/resend.py @@ -7,13 +7,10 @@ import os # third-party imports import celery import confini -import web3 -from cic_registry import CICRegistry -from cic_registry.chain import ChainSpec -from cic_registry.chain import ChainRegistry +from chainlib.chain import ChainSpec +from chainlib.eth.connection import EthHTTPConnection # local imports -from cic_eth.eth.rpc import RpcClient from cic_eth.api.api_admin import AdminApi logging.basicConfig(level=logging.WARNING) @@ -55,41 +52,20 @@ args_override = { config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) +config.add(args.tx_hash, '_TX_HASH', True) +config.add(args.unlock, '_UNLOCK', True) chain_spec = ChainSpec.from_chain_str(args.i) -chain_str = str(chain_spec) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) +rpc = EthHTTPConnection(config.get('ETH_PROVIDER')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -c = RpcClient(chain_spec) - -CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec) -chain_registry = ChainRegistry(chain_spec) -CICRegistry.add_chain_registry(chain_registry) -CICRegistry.add_path(config.get('ETH_ABI_DIR')) -CICRegistry.load_for(chain_spec) - - def main(): - api = AdminApi(c) + api = AdminApi(rpc) tx_details = api.tx(chain_spec, args.tx_hash) - t = api.resend(args.tx_hash, chain_str, unlock=True) - + t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK')) + print(t.get_leaf()) if __name__ == '__main__': main() diff --git a/apps/cic-eth/tests/task/test_task_tx.py b/apps/cic-eth/tests/task/test_task_tx.py index 2f223c54..99d1a3d0 100644 --- a/apps/cic-eth/tests/task/test_task_tx.py +++ b/apps/cic-eth/tests/task/test_task_tx.py @@ -12,10 +12,12 @@ from chainlib.eth.tx import ( transaction, receipt, ) +from hexathon import strip_0x # local imports from cic_eth.queue.tx import register_tx from cic_eth.eth.tx import cache_gas_data +from cic_eth.db.models.otx import Otx logg = logging.getLogger() @@ -60,6 +62,7 @@ def test_tx_send( assert rcpt['status'] == 1 +@pytest.mark.skip() def test_sync_tx( default_chain_spec, eth_rpc, @@ -67,3 +70,44 @@ def test_sync_tx( celery_worker, ): pass + + +def test_resend_with_higher_gas( + init_database, + default_chain_spec, + eth_rpc, + eth_signer, + agent_roles, + celery_worker, + ): + + chain_id = default_chain_spec.chain_id() + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) + (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) + #unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id()) + + s = celery.signature( + 'cic_eth.eth.tx.resend_with_higher_gas', + [ + tx_hash_hex, + default_chain_spec.asdict(), + ], + queue=None, + ) + t = s.apply_async() + r = t.get_leaf() + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==r) + otx = q.first() + if otx == None: + raise NotLocalTxError(r) + + tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id()) + logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice'])) + assert tx_after['gasPrice'] > tx_before['gasPrice'] + diff --git a/apps/contract-migration/scripts/import_users.py b/apps/contract-migration/scripts/import_users.py index 8264a319..79f370a7 100644 --- a/apps/contract-migration/scripts/import_users.py +++ b/apps/contract-migration/scripts/import_users.py @@ -36,7 +36,7 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') -argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node') +argparser.add_argument('--batch-size', dest='batch_size', default=80, type=int, help='burst size of sending transactions to node') argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') diff --git a/docker-compose.yml b/docker-compose.yml index 5adf741f..b0a29776 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -317,8 +317,8 @@ services: CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS - #DATABASE_DEBUG: ${DATABASE_DEBUG:-false} - DATABASE_DEBUG: 1 + DATABASE_DEBUG: ${DATABASE_DEBUG:-false} + #DATABASE_DEBUG: 1 depends_on: - eth @@ -359,6 +359,8 @@ services: CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS CIC_TX_RETRY_DELAY: 15 + BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50} + #DATABASE_DEBUG: 1 depends_on: - eth - postgres @@ -373,7 +375,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_retry.sh -vv + ./start_retry.sh -vv # command: "/root/start_retry.sh -q cic-eth -vv"