From 7a259b52be394f02c47d37c6c877cdcdfc2c7191 Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 30 Mar 2021 14:20:22 +0200 Subject: [PATCH] Rehabilitate send with higher gas task --- apps/cic-eth/cic_eth/eth/tx.py | 59 ++++++++++++------- .../cic-eth/cic_eth/runnable/daemons/retry.py | 10 ++-- apps/cic-eth/tests/task/test_task_tx.py | 44 ++++++++++++++ docker-compose.yml | 4 +- 4 files changed, 89 insertions(+), 28 deletions(-) diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index aef7d537..a930b847 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -7,7 +7,10 @@ import requests from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec from chainlib.eth.address import is_checksum_address -from chainlib.eth.gas import balance +from chainlib.eth.gas import ( + balance, + price, + ) from chainlib.eth.error import ( EthException, NotFoundEthException, @@ -17,11 +20,15 @@ from chainlib.eth.tx import ( receipt, raw, TxFormat, + TxFactory, unpack, ) from chainlib.connection import RPCConnection from chainlib.hash import keccak256_hex_to_hex -from chainlib.eth.gas import Gas +from chainlib.eth.gas import ( + Gas, + OverrideGasOracle, + ) from chainlib.eth.contract import ( abi_decode_single, ABIContractType, @@ -52,6 +59,7 @@ from cic_eth.queue.tx import ( get_tx, register_tx, get_nonce_tx, + create as queue_create, ) from cic_eth.error import OutOfGasError from cic_eth.error import LockedError @@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict): @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) -def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): +def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1): """Create a new transaction from an existing one with same nonce and higher gas price. :param txold_hash_hex: Transaction to re-create @@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa session.close() raise NotLocalTxError(txold_hash_hex) - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) + chain_spec = ChainSpec.from_dict(chain_spec_dict) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx)) - queue = self.request.delivery_info['routing_key'] + queue = self.request.delivery_info.get('routing_key') logg.debug('before {}'.format(tx)) - if gas != None: - tx['gasPrice'] = gas - else: - gas_price = c.gas_price() - if tx['gasPrice'] > gas_price: - logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) + + rpc = RPCConnection.connect(chain_spec, 'default') + new_gas_price = gas + if new_gas_price == None: + o = price() + r = rpc.do(o) + current_gas_price = int(r, 16) + if tx['gasPrice'] > current_gas_price: + logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice'])) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor) - tx['gasPrice'] += 1 + new_gas_price = tx['gasPrice'] + 1 else: new_gas_price = int(tx['gasPrice'] * default_factor) - if gas_price > new_gas_price: - tx['gasPrice'] = gas_price - else: - tx['gasPrice'] = new_gas_price + #if gas_price > new_gas_price: + # tx['gasPrice'] = gas_price + #else: + # tx['gasPrice'] = new_gas_price - (tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str) + + rpc_signer = RPCConnection.connect(chain_spec, 'signer') + gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc) + + c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id()) + logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) + tx['gasPrice'] = new_gas_price + (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) queue_create( tx['nonce'], tx['from'], tx_hash_hex, tx_signed_raw_hex, - chain_str, + chain_spec, session=session, ) TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) session.close() - s = create_check_gas_and_send_task( + s = create_check_gas_task( [tx_signed_raw_hex], - chain_str, + chain_spec, tx['from'], tx['gasPrice'] * tx['gas'], [tx_hash_hex], diff --git a/apps/cic-eth/cic_eth/runnable/daemons/retry.py b/apps/cic-eth/cic_eth/runnable/daemons/retry.py index 8c9deec4..9f87769d 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/retry.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/retry.py @@ -189,7 +189,7 @@ def dispatch(conn, chain_spec): # s_send.apply_async() -class FooFilter: +class StragglerFilter: def __init__(self, chain_spec, queue='cic-eth'): self.chain_spec = chain_spec @@ -198,14 +198,14 @@ class FooFilter: def filter(self, conn, block, tx, db_session=None): s_send = celery.signature( - 'cic_eth.eth.resend_with_higher_gas', + 'cic_eth.eth.tx.resend_with_higher_gas', [ - [tx], + tx, self.chain_spec.asdict(), ], queue=self.queue, ) - s_send.apply_async() + return s_send.apply_async() class RetrySyncer(HeadSyncer): @@ -254,7 +254,7 @@ def main(): #block = conn.do(o) syncer = RetrySyncer(conn, chain_spec, straggler_delay) syncer.backend.set(0, 0) - syncer.add_filter(FooFilter(chain_spec, queue=queue)) + syncer.add_filter(StragglerFilter(chain_spec, queue=queue)) syncer.loop(float(straggler_delay), conn) diff --git a/apps/cic-eth/tests/task/test_task_tx.py b/apps/cic-eth/tests/task/test_task_tx.py index 2f223c54..99d1a3d0 100644 --- a/apps/cic-eth/tests/task/test_task_tx.py +++ b/apps/cic-eth/tests/task/test_task_tx.py @@ -12,10 +12,12 @@ from chainlib.eth.tx import ( transaction, receipt, ) +from hexathon import strip_0x # local imports from cic_eth.queue.tx import register_tx from cic_eth.eth.tx import cache_gas_data +from cic_eth.db.models.otx import Otx logg = logging.getLogger() @@ -60,6 +62,7 @@ def test_tx_send( assert rcpt['status'] == 1 +@pytest.mark.skip() def test_sync_tx( default_chain_spec, eth_rpc, @@ -67,3 +70,44 @@ def test_sync_tx( celery_worker, ): pass + + +def test_resend_with_higher_gas( + init_database, + default_chain_spec, + eth_rpc, + eth_signer, + agent_roles, + celery_worker, + ): + + chain_id = default_chain_spec.chain_id() + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) + (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) + #unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) + register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database) + cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) + tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id()) + + s = celery.signature( + 'cic_eth.eth.tx.resend_with_higher_gas', + [ + tx_hash_hex, + default_chain_spec.asdict(), + ], + queue=None, + ) + t = s.apply_async() + r = t.get_leaf() + + q = init_database.query(Otx) + q = q.filter(Otx.tx_hash==r) + otx = q.first() + if otx == None: + raise NotLocalTxError(r) + + tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id()) + logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice'])) + assert tx_after['gasPrice'] > tx_before['gasPrice'] + diff --git a/docker-compose.yml b/docker-compose.yml index 5adf741f..23c61a48 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -317,8 +317,8 @@ services: CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS - #DATABASE_DEBUG: ${DATABASE_DEBUG:-false} - DATABASE_DEBUG: 1 + DATABASE_DEBUG: ${DATABASE_DEBUG:-false} + #DATABASE_DEBUG: 1 depends_on: - eth