Rehabilitate send with higher gas task

This commit is contained in:
nolash 2021-03-30 14:20:22 +02:00
parent 6f6882e770
commit 7a259b52be
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 89 additions and 28 deletions

View File

@ -7,7 +7,10 @@ import requests
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address from chainlib.eth.address import is_checksum_address
from chainlib.eth.gas import balance from chainlib.eth.gas import (
balance,
price,
)
from chainlib.eth.error import ( from chainlib.eth.error import (
EthException, EthException,
NotFoundEthException, NotFoundEthException,
@ -17,11 +20,15 @@ from chainlib.eth.tx import (
receipt, receipt,
raw, raw,
TxFormat, TxFormat,
TxFactory,
unpack, unpack,
) )
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex from chainlib.hash import keccak256_hex_to_hex
from chainlib.eth.gas import Gas from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainlib.eth.contract import ( from chainlib.eth.contract import (
abi_decode_single, abi_decode_single,
ABIContractType, ABIContractType,
@ -52,6 +59,7 @@ from cic_eth.queue.tx import (
get_tx, get_tx,
register_tx, register_tx,
get_nonce_tx, get_nonce_tx,
create as queue_create,
) )
from cic_eth.error import OutOfGasError from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError from cic_eth.error import LockedError
@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict):
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price. """Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create :param txold_hash_hex: Transaction to re-create
@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
session.close() session.close()
raise NotLocalTxError(txold_hash_hex) raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_dict(chain_spec_dict)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx)) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx)) logg.debug('before {}'.format(tx))
if gas != None:
tx['gasPrice'] = gas rpc = RPCConnection.connect(chain_spec, 'default')
else: new_gas_price = gas
gas_price = c.gas_price() if new_gas_price == None:
if tx['gasPrice'] > gas_price: o = price()
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
tx['gasPrice'] += 1 new_gas_price = tx['gasPrice'] + 1
else: else:
new_gas_price = int(tx['gasPrice'] * default_factor) new_gas_price = int(tx['gasPrice'] * default_factor)
if gas_price > new_gas_price: #if gas_price > new_gas_price:
tx['gasPrice'] = gas_price # tx['gasPrice'] = gas_price
else: #else:
tx['gasPrice'] = new_gas_price # tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
queue_create( queue_create(
tx['nonce'], tx['nonce'],
tx['from'], tx['from'],
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_spec,
session=session, session=session,
) )
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close() session.close()
s = create_check_gas_and_send_task( s = create_check_gas_task(
[tx_signed_raw_hex], [tx_signed_raw_hex],
chain_str, chain_spec,
tx['from'], tx['from'],
tx['gasPrice'] * tx['gas'], tx['gasPrice'] * tx['gas'],
[tx_hash_hex], [tx_hash_hex],

View File

@ -189,7 +189,7 @@ def dispatch(conn, chain_spec):
# s_send.apply_async() # s_send.apply_async()
class FooFilter: class StragglerFilter:
def __init__(self, chain_spec, queue='cic-eth'): def __init__(self, chain_spec, queue='cic-eth'):
self.chain_spec = chain_spec self.chain_spec = chain_spec
@ -198,14 +198,14 @@ class FooFilter:
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session=None):
s_send = celery.signature( s_send = celery.signature(
'cic_eth.eth.resend_with_higher_gas', 'cic_eth.eth.tx.resend_with_higher_gas',
[ [
[tx], tx,
self.chain_spec.asdict(), self.chain_spec.asdict(),
], ],
queue=self.queue, queue=self.queue,
) )
s_send.apply_async() return s_send.apply_async()
class RetrySyncer(HeadSyncer): class RetrySyncer(HeadSyncer):
@ -254,7 +254,7 @@ def main():
#block = conn.do(o) #block = conn.do(o)
syncer = RetrySyncer(conn, chain_spec, straggler_delay) syncer = RetrySyncer(conn, chain_spec, straggler_delay)
syncer.backend.set(0, 0) syncer.backend.set(0, 0)
syncer.add_filter(FooFilter(chain_spec, queue=queue)) syncer.add_filter(StragglerFilter(chain_spec, queue=queue))
syncer.loop(float(straggler_delay), conn) syncer.loop(float(straggler_delay), conn)

View File

@ -12,10 +12,12 @@ from chainlib.eth.tx import (
transaction, transaction,
receipt, receipt,
) )
from hexathon import strip_0x
# local imports # local imports
from cic_eth.queue.tx import register_tx from cic_eth.queue.tx import register_tx
from cic_eth.eth.tx import cache_gas_data from cic_eth.eth.tx import cache_gas_data
from cic_eth.db.models.otx import Otx
logg = logging.getLogger() logg = logging.getLogger()
@ -60,6 +62,7 @@ def test_tx_send(
assert rcpt['status'] == 1 assert rcpt['status'] == 1
@pytest.mark.skip()
def test_sync_tx( def test_sync_tx(
default_chain_spec, default_chain_spec,
eth_rpc, eth_rpc,
@ -67,3 +70,44 @@ def test_sync_tx(
celery_worker, celery_worker,
): ):
pass pass
def test_resend_with_higher_gas(
init_database,
default_chain_spec,
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
#unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id())
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==r)
otx = q.first()
if otx == None:
raise NotLocalTxError(r)
tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id())
logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice']))
assert tx_after['gasPrice'] > tx_before['gasPrice']

View File

@ -317,8 +317,8 @@ services:
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
#DATABASE_DEBUG: ${DATABASE_DEBUG:-false} DATABASE_DEBUG: ${DATABASE_DEBUG:-false}
DATABASE_DEBUG: 1 #DATABASE_DEBUG: 1
depends_on: depends_on:
- eth - eth