Merge branch 'lash/rehabilitate-retrier' into 'master'
Rehabilitate retrier See merge request grassrootseconomics/cic-internal-integration!77
This commit is contained in:
commit
68bdadcdf1
@ -134,7 +134,8 @@ class AdminApi:
|
|||||||
return s_have.apply_async()
|
return s_have.apply_async()
|
||||||
|
|
||||||
|
|
||||||
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False):
|
def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False):
|
||||||
|
|
||||||
logg.debug('resend {}'.format(tx_hash_hex))
|
logg.debug('resend {}'.format(tx_hash_hex))
|
||||||
s_get_tx_cache = celery.signature(
|
s_get_tx_cache = celery.signature(
|
||||||
'cic_eth.queue.tx.get_tx_cache',
|
'cic_eth.queue.tx.get_tx_cache',
|
||||||
@ -156,7 +157,7 @@ class AdminApi:
|
|||||||
s = celery.signature(
|
s = celery.signature(
|
||||||
'cic_eth.eth.tx.resend_with_higher_gas',
|
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||||
[
|
[
|
||||||
chain_str,
|
chain_spec.asdict(),
|
||||||
None,
|
None,
|
||||||
1.01,
|
1.01,
|
||||||
],
|
],
|
||||||
@ -176,7 +177,7 @@ class AdminApi:
|
|||||||
s_gas = celery.signature(
|
s_gas = celery.signature(
|
||||||
'cic_eth.admin.ctrl.unlock_send',
|
'cic_eth.admin.ctrl.unlock_send',
|
||||||
[
|
[
|
||||||
chain_str,
|
chain_spec.asdict(),
|
||||||
tx_dict['sender'],
|
tx_dict['sender'],
|
||||||
],
|
],
|
||||||
queue=self.queue,
|
queue=self.queue,
|
||||||
@ -218,7 +219,7 @@ class AdminApi:
|
|||||||
blocking_tx = k
|
blocking_tx = k
|
||||||
blocking_nonce = nonce_otx
|
blocking_nonce = nonce_otx
|
||||||
elif nonce_otx - last_nonce > 1:
|
elif nonce_otx - last_nonce > 1:
|
||||||
logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce))
|
logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
|
||||||
blocking_tx = k
|
blocking_tx = k
|
||||||
blocking_nonce = nonce_otx
|
blocking_nonce = nonce_otx
|
||||||
break
|
break
|
||||||
@ -312,10 +313,10 @@ class AdminApi:
|
|||||||
tx_dict = s.apply_async().get()
|
tx_dict = s.apply_async().get()
|
||||||
if tx_dict['sender'] == address:
|
if tx_dict['sender'] == address:
|
||||||
if tx_dict['nonce'] - last_nonce > 1:
|
if tx_dict['nonce'] - last_nonce > 1:
|
||||||
logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash']))
|
logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
|
||||||
errors.append('nonce')
|
errors.append('nonce')
|
||||||
elif tx_dict['nonce'] == last_nonce:
|
elif tx_dict['nonce'] == last_nonce:
|
||||||
logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash']))
|
logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
|
||||||
last_nonce = tx_dict['nonce']
|
last_nonce = tx_dict['nonce']
|
||||||
if not include_sender:
|
if not include_sender:
|
||||||
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
|
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
|
||||||
@ -480,15 +481,17 @@ class AdminApi:
|
|||||||
tx['destination_token_symbol'] = destination_token.symbol()
|
tx['destination_token_symbol'] = destination_token.symbol()
|
||||||
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
|
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
|
||||||
|
|
||||||
tx['network_status'] = 'Not submitted'
|
# TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
|
||||||
|
tx['network_status'] = 'Not in node'
|
||||||
|
|
||||||
r = None
|
r = None
|
||||||
try:
|
try:
|
||||||
o = transaction(tx_hash)
|
o = transaction(tx_hash)
|
||||||
r = self.rpc.do(o)
|
r = self.rpc.do(o)
|
||||||
|
if r != None:
|
||||||
|
tx['network_status'] = 'Mempool'
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
|
logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
|
||||||
tx['network_status'] = 'Mempool'
|
|
||||||
|
|
||||||
if r != None:
|
if r != None:
|
||||||
try:
|
try:
|
||||||
|
@ -7,7 +7,10 @@ import requests
|
|||||||
from chainlib.eth.constant import ZERO_ADDRESS
|
from chainlib.eth.constant import ZERO_ADDRESS
|
||||||
from chainlib.chain import ChainSpec
|
from chainlib.chain import ChainSpec
|
||||||
from chainlib.eth.address import is_checksum_address
|
from chainlib.eth.address import is_checksum_address
|
||||||
from chainlib.eth.gas import balance
|
from chainlib.eth.gas import (
|
||||||
|
balance,
|
||||||
|
price,
|
||||||
|
)
|
||||||
from chainlib.eth.error import (
|
from chainlib.eth.error import (
|
||||||
EthException,
|
EthException,
|
||||||
NotFoundEthException,
|
NotFoundEthException,
|
||||||
@ -17,11 +20,15 @@ from chainlib.eth.tx import (
|
|||||||
receipt,
|
receipt,
|
||||||
raw,
|
raw,
|
||||||
TxFormat,
|
TxFormat,
|
||||||
|
TxFactory,
|
||||||
unpack,
|
unpack,
|
||||||
)
|
)
|
||||||
from chainlib.connection import RPCConnection
|
from chainlib.connection import RPCConnection
|
||||||
from chainlib.hash import keccak256_hex_to_hex
|
from chainlib.hash import keccak256_hex_to_hex
|
||||||
from chainlib.eth.gas import Gas
|
from chainlib.eth.gas import (
|
||||||
|
Gas,
|
||||||
|
OverrideGasOracle,
|
||||||
|
)
|
||||||
from chainlib.eth.contract import (
|
from chainlib.eth.contract import (
|
||||||
abi_decode_single,
|
abi_decode_single,
|
||||||
ABIContractType,
|
ABIContractType,
|
||||||
@ -52,6 +59,7 @@ from cic_eth.queue.tx import (
|
|||||||
get_tx,
|
get_tx,
|
||||||
register_tx,
|
register_tx,
|
||||||
get_nonce_tx,
|
get_nonce_tx,
|
||||||
|
create as queue_create,
|
||||||
)
|
)
|
||||||
from cic_eth.error import OutOfGasError
|
from cic_eth.error import OutOfGasError
|
||||||
from cic_eth.error import LockedError
|
from cic_eth.error import LockedError
|
||||||
@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict):
|
|||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
|
||||||
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1):
|
def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
|
||||||
"""Create a new transaction from an existing one with same nonce and higher gas price.
|
"""Create a new transaction from an existing one with same nonce and higher gas price.
|
||||||
|
|
||||||
:param txold_hash_hex: Transaction to re-create
|
:param txold_hash_hex: Transaction to re-create
|
||||||
@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
|
|||||||
session.close()
|
session.close()
|
||||||
raise NotLocalTxError(txold_hash_hex)
|
raise NotLocalTxError(txold_hash_hex)
|
||||||
|
|
||||||
chain_spec = ChainSpec.from_chain_str(chain_str)
|
chain_spec = ChainSpec.from_dict(chain_spec_dict)
|
||||||
c = RpcClient(chain_spec)
|
|
||||||
|
|
||||||
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
|
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
|
||||||
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
|
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
|
||||||
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
|
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
|
||||||
|
|
||||||
queue = self.request.delivery_info['routing_key']
|
queue = self.request.delivery_info.get('routing_key')
|
||||||
|
|
||||||
logg.debug('before {}'.format(tx))
|
logg.debug('before {}'.format(tx))
|
||||||
if gas != None:
|
|
||||||
tx['gasPrice'] = gas
|
rpc = RPCConnection.connect(chain_spec, 'default')
|
||||||
else:
|
new_gas_price = gas
|
||||||
gas_price = c.gas_price()
|
if new_gas_price == None:
|
||||||
if tx['gasPrice'] > gas_price:
|
o = price()
|
||||||
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice']))
|
r = rpc.do(o)
|
||||||
|
current_gas_price = int(r, 16)
|
||||||
|
if tx['gasPrice'] > current_gas_price:
|
||||||
|
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice']))
|
||||||
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
|
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
|
||||||
tx['gasPrice'] += 1
|
new_gas_price = tx['gasPrice'] + 1
|
||||||
else:
|
else:
|
||||||
new_gas_price = int(tx['gasPrice'] * default_factor)
|
new_gas_price = int(tx['gasPrice'] * default_factor)
|
||||||
if gas_price > new_gas_price:
|
#if gas_price > new_gas_price:
|
||||||
tx['gasPrice'] = gas_price
|
# tx['gasPrice'] = gas_price
|
||||||
else:
|
#else:
|
||||||
tx['gasPrice'] = new_gas_price
|
# tx['gasPrice'] = new_gas_price
|
||||||
|
|
||||||
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
|
|
||||||
|
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
|
||||||
|
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
|
||||||
|
|
||||||
|
c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
|
||||||
|
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
|
||||||
|
tx['gasPrice'] = new_gas_price
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
|
||||||
queue_create(
|
queue_create(
|
||||||
tx['nonce'],
|
tx['nonce'],
|
||||||
tx['from'],
|
tx['from'],
|
||||||
tx_hash_hex,
|
tx_hash_hex,
|
||||||
tx_signed_raw_hex,
|
tx_signed_raw_hex,
|
||||||
chain_str,
|
chain_spec,
|
||||||
session=session,
|
session=session,
|
||||||
)
|
)
|
||||||
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
|
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
s = create_check_gas_and_send_task(
|
s = create_check_gas_task(
|
||||||
[tx_signed_raw_hex],
|
[tx_signed_raw_hex],
|
||||||
chain_str,
|
chain_spec,
|
||||||
tx['from'],
|
tx['from'],
|
||||||
tx['gasPrice'] * tx['gas'],
|
tx['gasPrice'] * tx['gas'],
|
||||||
[tx_hash_hex],
|
[tx_hash_hex],
|
||||||
|
@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
|
|||||||
q = q.filter(Otx.status.op('&')(status)>0)
|
q = q.filter(Otx.status.op('&')(status)>0)
|
||||||
if not_status != None:
|
if not_status != None:
|
||||||
q = q.filter(Otx.status.op('&')(not_status)==0)
|
q = q.filter(Otx.status.op('&')(not_status)==0)
|
||||||
|
q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc())
|
||||||
i = 0
|
i = 0
|
||||||
for o in q.all():
|
for o in q.all():
|
||||||
if limit > 0 and i == limit:
|
if limit > 0 and i == limit:
|
||||||
@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
|
|||||||
|
|
||||||
|
|
||||||
# TODO: move query to model
|
# TODO: move query to model
|
||||||
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None):
|
def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None):
|
||||||
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
|
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
|
||||||
|
|
||||||
Will omit addresses that have the LockEnum.SEND bit in Lock set.
|
Will omit addresses that have the LockEnum.SEND bit in Lock set.
|
||||||
@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
if status == StatusEnum.PENDING:
|
if status == StatusEnum.PENDING:
|
||||||
q_outer = q_outer.filter(Otx.status==status.value)
|
q_outer = q_outer.filter(Otx.status==status.value)
|
||||||
else:
|
else:
|
||||||
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value)
|
q_outer = q_outer.filter(Otx.status.op('&')(status)==status)
|
||||||
|
|
||||||
|
if not_status != None:
|
||||||
|
q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0)
|
||||||
|
|
||||||
if recipient != None:
|
if recipient != None:
|
||||||
q_outer = q_outer.filter(TxCache.recipient==recipient)
|
q_outer = q_outer.filter(TxCache.recipient==recipient)
|
||||||
@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
|
|
||||||
txs = {}
|
txs = {}
|
||||||
|
|
||||||
|
i = 0
|
||||||
for r in q_outer.all():
|
for r in q_outer.all():
|
||||||
q = session.query(Otx)
|
q = session.query(Otx)
|
||||||
q = q.join(TxCache)
|
q = q.join(TxCache)
|
||||||
@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
|
|||||||
session.add(o)
|
session.add(o)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
i += 1
|
||||||
|
if limit > 0 and limit == i:
|
||||||
|
break
|
||||||
|
|
||||||
SessionBase.release_session(session)
|
SessionBase.release_session(session)
|
||||||
|
|
||||||
return txs
|
return txs
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# standard imports
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
@ -5,22 +6,36 @@ import argparse
|
|||||||
import re
|
import re
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
import web3
|
# external imports
|
||||||
import confini
|
import confini
|
||||||
import celery
|
import celery
|
||||||
from cic_eth_registry import CICRegistry
|
from cic_eth_registry import CICRegistry
|
||||||
from chainlib.chain import ChainSpec
|
from chainlib.chain import ChainSpec
|
||||||
|
from chainlib.eth.tx import unpack
|
||||||
|
from chainlib.connection import RPCConnection
|
||||||
|
from chainlib.eth.block import (
|
||||||
|
block_latest,
|
||||||
|
block_by_number,
|
||||||
|
Block,
|
||||||
|
)
|
||||||
|
from chainsyncer.driver import HeadSyncer
|
||||||
|
from chainsyncer.backend import MemBackend
|
||||||
|
from chainsyncer.error import NoBlockForYou
|
||||||
|
|
||||||
|
# local imports
|
||||||
from cic_eth.db import dsn_from_config
|
from cic_eth.db import dsn_from_config
|
||||||
from cic_eth.db import SessionBase
|
from cic_eth.db import SessionBase
|
||||||
from cic_eth.eth import RpcClient
|
from cic_eth.queue.tx import (
|
||||||
from cic_eth.sync.retry import RetrySyncer
|
get_status_tx,
|
||||||
from cic_eth.queue.tx import get_status_tx
|
get_tx,
|
||||||
from cic_eth.queue.tx import get_tx
|
# get_upcoming_tx,
|
||||||
|
)
|
||||||
from cic_eth.admin.ctrl import lock_send
|
from cic_eth.admin.ctrl import lock_send
|
||||||
from cic_eth.db.enum import StatusEnum
|
from cic_eth.db.enum import (
|
||||||
from cic_eth.db.enum import LockEnum
|
StatusEnum,
|
||||||
from cic_eth.eth.util import unpack_signed_raw_tx_hex
|
StatusBits,
|
||||||
|
LockEnum,
|
||||||
|
)
|
||||||
|
|
||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
@ -31,7 +46,8 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio
|
|||||||
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
|
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
|
||||||
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
|
||||||
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
|
||||||
argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent')
|
argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration')
|
||||||
|
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
|
||||||
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
|
||||||
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
|
||||||
argparser.add_argument('-v', help='be verbose', action='store_true')
|
argparser.add_argument('-v', help='be verbose', action='store_true')
|
||||||
@ -51,7 +67,6 @@ config.process()
|
|||||||
# override args
|
# override args
|
||||||
args_override = {
|
args_override = {
|
||||||
'ETH_PROVIDER': getattr(args, 'p'),
|
'ETH_PROVIDER': getattr(args, 'p'),
|
||||||
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
|
|
||||||
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
||||||
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
|
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
|
||||||
}
|
}
|
||||||
@ -59,6 +74,7 @@ config.dict_override(args_override, 'cli flag')
|
|||||||
config.censor('PASSWORD', 'DATABASE')
|
config.censor('PASSWORD', 'DATABASE')
|
||||||
config.censor('PASSWORD', 'SSL')
|
config.censor('PASSWORD', 'SSL')
|
||||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||||
|
config.add(args.batch_size, '_BATCH_SIZE', True)
|
||||||
|
|
||||||
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
|
||||||
|
|
||||||
@ -66,10 +82,10 @@ queue = args.q
|
|||||||
|
|
||||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||||
|
|
||||||
RPCConnection.registry_location(args.p, chain_spec, tag='default')
|
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
|
||||||
|
|
||||||
dsn = dsn_from_config(config)
|
dsn = dsn_from_config(config)
|
||||||
SessionBase.connect(dsn)
|
SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
|
||||||
|
|
||||||
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
|
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
|
||||||
|
|
||||||
@ -178,53 +194,85 @@ def dispatch(conn, chain_spec):
|
|||||||
# s_send.apply_async()
|
# s_send.apply_async()
|
||||||
|
|
||||||
|
|
||||||
class RetrySyncer(Syncer):
|
class StragglerFilter:
|
||||||
|
|
||||||
def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None):
|
def __init__(self, chain_spec, queue='cic-eth'):
|
||||||
|
self.chain_spec = chain_spec
|
||||||
|
self.queue = queue
|
||||||
|
|
||||||
|
|
||||||
|
def filter(self, conn, block, tx, db_session=None):
|
||||||
|
logg.debug('tx {}'.format(tx))
|
||||||
|
s_send = celery.signature(
|
||||||
|
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||||
|
[
|
||||||
|
tx,
|
||||||
|
self.chain_spec.asdict(),
|
||||||
|
],
|
||||||
|
queue=self.queue,
|
||||||
|
)
|
||||||
|
return s_send.apply_async()
|
||||||
|
#return s_send
|
||||||
|
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return 'stragglerfilter'
|
||||||
|
|
||||||
|
|
||||||
|
class RetrySyncer(HeadSyncer):
|
||||||
|
|
||||||
|
def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None):
|
||||||
|
backend = MemBackend(chain_spec, None)
|
||||||
|
super(RetrySyncer, self).__init__(backend)
|
||||||
self.chain_spec = chain_spec
|
self.chain_spec = chain_spec
|
||||||
if failed_grace_seconds == None:
|
if failed_grace_seconds == None:
|
||||||
failed_grace_seconds = stalled_grace_seconds
|
failed_grace_seconds = stalled_grace_seconds
|
||||||
self.stalled_grace_seconds = stalled_grace_seconds
|
self.stalled_grace_seconds = stalled_grace_seconds
|
||||||
self.failed_grace_seconds = failed_grace_seconds
|
self.failed_grace_seconds = failed_grace_seconds
|
||||||
self.final_func = final_func
|
self.batch_size = batch_size
|
||||||
|
self.conn = conn
|
||||||
|
|
||||||
|
|
||||||
def get(self):
|
def get(self, conn):
|
||||||
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds)
|
o = block_latest()
|
||||||
# failed_txs = get_status_tx(
|
r = conn.do(o)
|
||||||
# StatusEnum.SENDFAIL.value,
|
(pair, flags) = self.backend.get()
|
||||||
# before=before,
|
n = int(r, 16)
|
||||||
# )
|
if n == pair[0]:
|
||||||
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
|
raise NoBlockForYou('block {} already checked'.format(n))
|
||||||
stalled_txs = get_status_tx(
|
o = block_by_number(n)
|
||||||
StatusBits.IN_NETWORK.value,
|
r = conn.do(o)
|
||||||
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
|
b = Block(r)
|
||||||
before=before,
|
return b
|
||||||
)
|
|
||||||
# return list(failed_txs.keys()) + list(stalled_txs.keys())
|
|
||||||
return stalled_txs
|
|
||||||
|
|
||||||
def process(self, conn, ref):
|
|
||||||
logg.debug('tx {}'.format(ref))
|
|
||||||
for f in self.filter:
|
|
||||||
f(conn, ref, None, str(self.chain_spec))
|
|
||||||
|
|
||||||
|
|
||||||
|
def process(self, conn, block):
|
||||||
|
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
|
||||||
|
stalled_txs = get_status_tx(
|
||||||
|
StatusBits.IN_NETWORK.value,
|
||||||
|
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
|
||||||
|
before=before,
|
||||||
|
limit=self.batch_size,
|
||||||
|
)
|
||||||
|
# stalled_txs = get_upcoming_tx(
|
||||||
|
# status=StatusBits.IN_NETWORK.value,
|
||||||
|
# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
|
||||||
|
# before=before,
|
||||||
|
# limit=self.batch_size,
|
||||||
|
# )
|
||||||
|
for tx in stalled_txs:
|
||||||
|
self.filter.apply(self.conn, block, tx)
|
||||||
|
self.backend.set(block.number, 0)
|
||||||
|
|
||||||
def loop(self, interval):
|
|
||||||
while self.running and Syncer.running_global:
|
|
||||||
rpc = RPCConnection.connect(self.chain_spec, 'default')
|
|
||||||
for tx in self.get():
|
|
||||||
self.process(rpc, tx)
|
|
||||||
if self.final_func != None:
|
|
||||||
self.final_func(rpc, self.chain_spec)
|
|
||||||
time.sleep(interval)
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
#o = block_latest()
|
||||||
syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch)
|
conn = RPCConnection.connect(chain_spec, 'default')
|
||||||
syncer.filter.append(sendfail_filter)
|
#block = conn.do(o)
|
||||||
syncer.loop(float(straggler_delay))
|
syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
|
||||||
|
syncer.backend.set(0, 0)
|
||||||
|
syncer.add_filter(StragglerFilter(chain_spec, queue=queue))
|
||||||
|
syncer.loop(float(straggler_delay), conn)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -7,13 +7,10 @@ import os
|
|||||||
# third-party imports
|
# third-party imports
|
||||||
import celery
|
import celery
|
||||||
import confini
|
import confini
|
||||||
import web3
|
from chainlib.chain import ChainSpec
|
||||||
from cic_registry import CICRegistry
|
from chainlib.eth.connection import EthHTTPConnection
|
||||||
from cic_registry.chain import ChainSpec
|
|
||||||
from cic_registry.chain import ChainRegistry
|
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_eth.eth.rpc import RpcClient
|
|
||||||
from cic_eth.api.api_admin import AdminApi
|
from cic_eth.api.api_admin import AdminApi
|
||||||
|
|
||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
@ -55,41 +52,20 @@ args_override = {
|
|||||||
config.censor('PASSWORD', 'DATABASE')
|
config.censor('PASSWORD', 'DATABASE')
|
||||||
config.censor('PASSWORD', 'SSL')
|
config.censor('PASSWORD', 'SSL')
|
||||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||||
|
config.add(args.tx_hash, '_TX_HASH', True)
|
||||||
|
config.add(args.unlock, '_UNLOCK', True)
|
||||||
|
|
||||||
chain_spec = ChainSpec.from_chain_str(args.i)
|
chain_spec = ChainSpec.from_chain_str(args.i)
|
||||||
chain_str = str(chain_spec)
|
|
||||||
|
|
||||||
re_websocket = re.compile('^wss?://')
|
rpc = EthHTTPConnection(config.get('ETH_PROVIDER'))
|
||||||
re_http = re.compile('^https?://')
|
|
||||||
blockchain_provider = config.get('ETH_PROVIDER')
|
|
||||||
if re.match(re_websocket, blockchain_provider) != None:
|
|
||||||
blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider)
|
|
||||||
elif re.match(re_http, blockchain_provider) != None:
|
|
||||||
blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider)
|
|
||||||
else:
|
|
||||||
raise ValueError('unknown provider url {}'.format(blockchain_provider))
|
|
||||||
|
|
||||||
def web3_constructor():
|
|
||||||
w3 = web3.Web3(blockchain_provider)
|
|
||||||
return (blockchain_provider, w3)
|
|
||||||
RpcClient.set_constructor(web3_constructor)
|
|
||||||
|
|
||||||
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
|
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
|
||||||
|
|
||||||
c = RpcClient(chain_spec)
|
|
||||||
|
|
||||||
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
|
|
||||||
chain_registry = ChainRegistry(chain_spec)
|
|
||||||
CICRegistry.add_chain_registry(chain_registry)
|
|
||||||
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
|
|
||||||
CICRegistry.load_for(chain_spec)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
api = AdminApi(c)
|
api = AdminApi(rpc)
|
||||||
tx_details = api.tx(chain_spec, args.tx_hash)
|
tx_details = api.tx(chain_spec, args.tx_hash)
|
||||||
t = api.resend(args.tx_hash, chain_str, unlock=True)
|
t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK'))
|
||||||
|
print(t.get_leaf())
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
@ -12,10 +12,12 @@ from chainlib.eth.tx import (
|
|||||||
transaction,
|
transaction,
|
||||||
receipt,
|
receipt,
|
||||||
)
|
)
|
||||||
|
from hexathon import strip_0x
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
from cic_eth.queue.tx import register_tx
|
from cic_eth.queue.tx import register_tx
|
||||||
from cic_eth.eth.tx import cache_gas_data
|
from cic_eth.eth.tx import cache_gas_data
|
||||||
|
from cic_eth.db.models.otx import Otx
|
||||||
|
|
||||||
logg = logging.getLogger()
|
logg = logging.getLogger()
|
||||||
|
|
||||||
@ -60,6 +62,7 @@ def test_tx_send(
|
|||||||
assert rcpt['status'] == 1
|
assert rcpt['status'] == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip()
|
||||||
def test_sync_tx(
|
def test_sync_tx(
|
||||||
default_chain_spec,
|
default_chain_spec,
|
||||||
eth_rpc,
|
eth_rpc,
|
||||||
@ -67,3 +70,44 @@ def test_sync_tx(
|
|||||||
celery_worker,
|
celery_worker,
|
||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_resend_with_higher_gas(
|
||||||
|
init_database,
|
||||||
|
default_chain_spec,
|
||||||
|
eth_rpc,
|
||||||
|
eth_signer,
|
||||||
|
agent_roles,
|
||||||
|
celery_worker,
|
||||||
|
):
|
||||||
|
|
||||||
|
chain_id = default_chain_spec.chain_id()
|
||||||
|
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
|
||||||
|
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
|
||||||
|
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
|
||||||
|
#unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
|
||||||
|
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
|
||||||
|
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
|
||||||
|
tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id())
|
||||||
|
|
||||||
|
s = celery.signature(
|
||||||
|
'cic_eth.eth.tx.resend_with_higher_gas',
|
||||||
|
[
|
||||||
|
tx_hash_hex,
|
||||||
|
default_chain_spec.asdict(),
|
||||||
|
],
|
||||||
|
queue=None,
|
||||||
|
)
|
||||||
|
t = s.apply_async()
|
||||||
|
r = t.get_leaf()
|
||||||
|
|
||||||
|
q = init_database.query(Otx)
|
||||||
|
q = q.filter(Otx.tx_hash==r)
|
||||||
|
otx = q.first()
|
||||||
|
if otx == None:
|
||||||
|
raise NotLocalTxError(r)
|
||||||
|
|
||||||
|
tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id())
|
||||||
|
logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice']))
|
||||||
|
assert tx_after['gasPrice'] > tx_before['gasPrice']
|
||||||
|
|
||||||
|
@ -36,7 +36,7 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis
|
|||||||
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
|
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
|
||||||
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
|
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
|
||||||
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
|
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
|
||||||
argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node')
|
argparser.add_argument('--batch-size', dest='batch_size', default=80, type=int, help='burst size of sending transactions to node')
|
||||||
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
|
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
|
||||||
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
|
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
|
||||||
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
|
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')
|
||||||
|
@ -317,8 +317,8 @@ services:
|
|||||||
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
|
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
|
||||||
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
|
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
|
||||||
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
|
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
|
||||||
#DATABASE_DEBUG: ${DATABASE_DEBUG:-false}
|
DATABASE_DEBUG: ${DATABASE_DEBUG:-false}
|
||||||
DATABASE_DEBUG: 1
|
#DATABASE_DEBUG: 1
|
||||||
|
|
||||||
depends_on:
|
depends_on:
|
||||||
- eth
|
- eth
|
||||||
@ -359,6 +359,8 @@ services:
|
|||||||
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
|
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
|
||||||
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
|
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
|
||||||
CIC_TX_RETRY_DELAY: 15
|
CIC_TX_RETRY_DELAY: 15
|
||||||
|
BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50}
|
||||||
|
#DATABASE_DEBUG: 1
|
||||||
depends_on:
|
depends_on:
|
||||||
- eth
|
- eth
|
||||||
- postgres
|
- postgres
|
||||||
@ -373,7 +375,7 @@ services:
|
|||||||
- -c
|
- -c
|
||||||
- |
|
- |
|
||||||
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
|
||||||
./start_retry.sh -vv
|
./start_retry.sh -vv
|
||||||
# command: "/root/start_retry.sh -q cic-eth -vv"
|
# command: "/root/start_retry.sh -q cic-eth -vv"
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user