Merge remote-tracking branch 'origin/master' into lash/rehabilitate-cic-cache

This commit is contained in:
nolash 2021-04-02 15:17:35 +02:00
commit 2826ddef29
11 changed files with 302 additions and 152 deletions

View File

@ -134,7 +134,8 @@ class AdminApi:
return s_have.apply_async() return s_have.apply_async()
def resend(self, tx_hash_hex, chain_str, in_place=True, unlock=False): def resend(self, tx_hash_hex, chain_spec, in_place=True, unlock=False):
logg.debug('resend {}'.format(tx_hash_hex)) logg.debug('resend {}'.format(tx_hash_hex))
s_get_tx_cache = celery.signature( s_get_tx_cache = celery.signature(
'cic_eth.queue.tx.get_tx_cache', 'cic_eth.queue.tx.get_tx_cache',
@ -156,7 +157,7 @@ class AdminApi:
s = celery.signature( s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas', 'cic_eth.eth.tx.resend_with_higher_gas',
[ [
chain_str, chain_spec.asdict(),
None, None,
1.01, 1.01,
], ],
@ -176,7 +177,7 @@ class AdminApi:
s_gas = celery.signature( s_gas = celery.signature(
'cic_eth.admin.ctrl.unlock_send', 'cic_eth.admin.ctrl.unlock_send',
[ [
chain_str, chain_spec.asdict(),
tx_dict['sender'], tx_dict['sender'],
], ],
queue=self.queue, queue=self.queue,
@ -218,7 +219,7 @@ class AdminApi:
blocking_tx = k blocking_tx = k
blocking_nonce = nonce_otx blocking_nonce = nonce_otx
elif nonce_otx - last_nonce > 1: elif nonce_otx - last_nonce > 1:
logg.error('nonce gap; {} followed {}'.format(nonce_otx, last_nonce)) logg.error('nonce gap; {} followed {} for account {}'.format(nonce_otx, last_nonce, tx['from']))
blocking_tx = k blocking_tx = k
blocking_nonce = nonce_otx blocking_nonce = nonce_otx
break break
@ -312,10 +313,10 @@ class AdminApi:
tx_dict = s.apply_async().get() tx_dict = s.apply_async().get()
if tx_dict['sender'] == address: if tx_dict['sender'] == address:
if tx_dict['nonce'] - last_nonce > 1: if tx_dict['nonce'] - last_nonce > 1:
logg.error('nonce gap; {} followed {} for tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['hash'])) logg.error('nonce gap; {} followed {} for address {} tx {}'.format(tx_dict['nonce'], last_nonce, tx_dict['sender'], tx_hash))
errors.append('nonce') errors.append('nonce')
elif tx_dict['nonce'] == last_nonce: elif tx_dict['nonce'] == last_nonce:
logg.warning('nonce {} duplicate in tx {}'.format(tx_dict['nonce'], tx_dict['hash'])) logg.info('nonce {} duplicate for address {} in tx {}'.format(tx_dict['nonce'], tx_dict['sender'], tx_hash))
last_nonce = tx_dict['nonce'] last_nonce = tx_dict['nonce']
if not include_sender: if not include_sender:
logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash'])) logg.debug('skipping sender tx {}'.format(tx_dict['tx_hash']))
@ -480,15 +481,17 @@ class AdminApi:
tx['destination_token_symbol'] = destination_token.symbol() tx['destination_token_symbol'] = destination_token.symbol()
tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call() tx['recipient_token_balance'] = source_token.function('balanceOf')(tx['recipient']).call()
tx['network_status'] = 'Not submitted' # TODO: this can mean either not subitted or culled, need to check other txs with same nonce to determine which
tx['network_status'] = 'Not in node'
r = None r = None
try: try:
o = transaction(tx_hash) o = transaction(tx_hash)
r = self.rpc.do(o) r = self.rpc.do(o)
if r != None:
tx['network_status'] = 'Mempool'
except Exception as e: except Exception as e:
logg.warning('(too permissive exception handler, please fix!) {}'.format(e)) logg.warning('(too permissive exception handler, please fix!) {}'.format(e))
tx['network_status'] = 'Mempool'
if r != None: if r != None:
try: try:

View File

@ -7,7 +7,10 @@ import requests
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address from chainlib.eth.address import is_checksum_address
from chainlib.eth.gas import balance from chainlib.eth.gas import (
balance,
price,
)
from chainlib.eth.error import ( from chainlib.eth.error import (
EthException, EthException,
NotFoundEthException, NotFoundEthException,
@ -17,11 +20,15 @@ from chainlib.eth.tx import (
receipt, receipt,
raw, raw,
TxFormat, TxFormat,
TxFactory,
unpack, unpack,
) )
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.hash import keccak256_hex_to_hex from chainlib.hash import keccak256_hex_to_hex
from chainlib.eth.gas import Gas from chainlib.eth.gas import (
Gas,
OverrideGasOracle,
)
from chainlib.eth.contract import ( from chainlib.eth.contract import (
abi_decode_single, abi_decode_single,
ABIContractType, ABIContractType,
@ -52,6 +59,7 @@ from cic_eth.queue.tx import (
get_tx, get_tx,
register_tx, register_tx,
get_nonce_tx, get_nonce_tx,
create as queue_create,
) )
from cic_eth.error import OutOfGasError from cic_eth.error import OutOfGasError
from cic_eth.error import LockedError from cic_eth.error import LockedError
@ -370,7 +378,7 @@ def refill_gas(self, recipient_address, chain_spec_dict):
@celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyAndSignerTask)
def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_factor=1.1): def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, default_factor=1.1):
"""Create a new transaction from an existing one with same nonce and higher gas price. """Create a new transaction from an existing one with same nonce and higher gas price.
:param txold_hash_hex: Transaction to re-create :param txold_hash_hex: Transaction to re-create
@ -394,46 +402,55 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_str, gas=None, default_fa
session.close() session.close()
raise NotLocalTxError(txold_hash_hex) raise NotLocalTxError(txold_hash_hex)
chain_spec = ChainSpec.from_chain_str(chain_str) chain_spec = ChainSpec.from_dict(chain_spec_dict)
c = RpcClient(chain_spec)
tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:]) tx_signed_raw_bytes = bytes.fromhex(otx.signed_tx[2:])
tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id()) tx = unpack(tx_signed_raw_bytes, chain_spec.chain_id())
logg.debug('resend otx {} {}'.format(tx, otx.signed_tx)) logg.debug('resend otx {} {}'.format(tx, otx.signed_tx))
queue = self.request.delivery_info['routing_key'] queue = self.request.delivery_info.get('routing_key')
logg.debug('before {}'.format(tx)) logg.debug('before {}'.format(tx))
if gas != None:
tx['gasPrice'] = gas rpc = RPCConnection.connect(chain_spec, 'default')
else: new_gas_price = gas
gas_price = c.gas_price() if new_gas_price == None:
if tx['gasPrice'] > gas_price: o = price()
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(gas_price, tx['gasPrice'])) r = rpc.do(o)
current_gas_price = int(r, 16)
if tx['gasPrice'] > current_gas_price:
logg.info('Network gas price {} is lower than overdue tx gas price {}'.format(curent_gas_price, tx['gasPrice']))
#tx['gasPrice'] = int(tx['gasPrice'] * default_factor) #tx['gasPrice'] = int(tx['gasPrice'] * default_factor)
tx['gasPrice'] += 1 new_gas_price = tx['gasPrice'] + 1
else: else:
new_gas_price = int(tx['gasPrice'] * default_factor) new_gas_price = int(tx['gasPrice'] * default_factor)
if gas_price > new_gas_price: #if gas_price > new_gas_price:
tx['gasPrice'] = gas_price # tx['gasPrice'] = gas_price
else: #else:
tx['gasPrice'] = new_gas_price # tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = sign_tx(tx, chain_str)
rpc_signer = RPCConnection.connect(chain_spec, 'signer')
gas_oracle = OverrideGasOracle(price=new_gas_price, conn=rpc)
c = TxFactory(signer=rpc_signer, gas_oracle=gas_oracle, chain_id=chain_spec.chain_id())
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
queue_create( queue_create(
tx['nonce'], tx['nonce'],
tx['from'], tx['from'],
tx_hash_hex, tx_hash_hex,
tx_signed_raw_hex, tx_signed_raw_hex,
chain_str, chain_spec,
session=session, session=session,
) )
TxCache.clone(txold_hash_hex, tx_hash_hex, session=session) TxCache.clone(txold_hash_hex, tx_hash_hex, session=session)
session.close() session.close()
s = create_check_gas_and_send_task( s = create_check_gas_task(
[tx_signed_raw_hex], [tx_signed_raw_hex],
chain_str, chain_spec,
tx['from'], tx['from'],
tx['gasPrice'] * tx['gas'], tx['gasPrice'] * tx['gas'],
[tx_hash_hex], [tx_hash_hex],

View File

@ -676,6 +676,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
q = q.filter(Otx.status.op('&')(status)>0) q = q.filter(Otx.status.op('&')(status)>0)
if not_status != None: if not_status != None:
q = q.filter(Otx.status.op('&')(not_status)==0) q = q.filter(Otx.status.op('&')(not_status)==0)
q = q.order_by(Otx.nonce.asc(), Otx.date_created.asc())
i = 0 i = 0
for o in q.all(): for o in q.all():
if limit > 0 and i == limit: if limit > 0 and i == limit:
@ -687,7 +688,7 @@ def get_status_tx(status, not_status=None, before=None, exact=False, limit=0, se
# TODO: move query to model # TODO: move query to model
def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, chain_id=0, session=None): def get_upcoming_tx(status=StatusEnum.READYSEND, not_status=None, recipient=None, before=None, limit=0, chain_id=0, session=None):
"""Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions. """Returns the next pending transaction, specifically the transaction with the lowest nonce, for every recipient that has pending transactions.
Will omit addresses that have the LockEnum.SEND bit in Lock set. Will omit addresses that have the LockEnum.SEND bit in Lock set.
@ -721,7 +722,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
if status == StatusEnum.PENDING: if status == StatusEnum.PENDING:
q_outer = q_outer.filter(Otx.status==status.value) q_outer = q_outer.filter(Otx.status==status.value)
else: else:
q_outer = q_outer.filter(Otx.status.op('&')(status.value)==status.value) q_outer = q_outer.filter(Otx.status.op('&')(status)==status)
if not_status != None:
q_outer = q_outer.filter(Otx.status.op('&')(not_status)==0)
if recipient != None: if recipient != None:
q_outer = q_outer.filter(TxCache.recipient==recipient) q_outer = q_outer.filter(TxCache.recipient==recipient)
@ -730,6 +734,7 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
txs = {} txs = {}
i = 0
for r in q_outer.all(): for r in q_outer.all():
q = session.query(Otx) q = session.query(Otx)
q = q.join(TxCache) q = q.join(TxCache)
@ -758,6 +763,10 @@ def get_upcoming_tx(status=StatusEnum.READYSEND, recipient=None, before=None, ch
session.add(o) session.add(o)
session.commit() session.commit()
i += 1
if limit > 0 and limit == i:
break
SessionBase.release_session(session) SessionBase.release_session(session)
return txs return txs

View File

@ -88,7 +88,7 @@ run = True
class DispatchSyncer: class DispatchSyncer:
yield_delay = 0.005 yield_delay = 0.0005
def __init__(self, chain_spec): def __init__(self, chain_spec):
self.chain_spec = chain_spec self.chain_spec = chain_spec

View File

@ -1,3 +1,4 @@
# standard imports
import os import os
import sys import sys
import logging import logging
@ -5,22 +6,36 @@ import argparse
import re import re
import datetime import datetime
import web3 # external imports
import confini import confini
import celery import celery
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.tx import unpack
from chainlib.connection import RPCConnection
from chainlib.eth.block import (
block_latest,
block_by_number,
Block,
)
from chainsyncer.driver import HeadSyncer
from chainsyncer.backend import MemBackend
from chainsyncer.error import NoBlockForYou
# local imports
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db import SessionBase from cic_eth.db import SessionBase
from cic_eth.eth import RpcClient from cic_eth.queue.tx import (
from cic_eth.sync.retry import RetrySyncer get_status_tx,
from cic_eth.queue.tx import get_status_tx get_tx,
from cic_eth.queue.tx import get_tx # get_upcoming_tx,
)
from cic_eth.admin.ctrl import lock_send from cic_eth.admin.ctrl import lock_send
from cic_eth.db.enum import StatusEnum from cic_eth.db.enum import (
from cic_eth.db.enum import LockEnum StatusEnum,
from cic_eth.eth.util import unpack_signed_raw_tx_hex StatusBits,
LockEnum,
)
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -31,7 +46,8 @@ argparser = argparse.ArgumentParser(description='daemon that monitors transactio
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec')
argparser.add_argument('--retry-delay', dest='retry_delay', type=str, help='seconds to wait for retrying a transaction that is marked as sent') argparser.add_argument('--batch-size', dest='batch_size', type=int, default=50, help='max amount of txs to resend per iteration')
argparser.add_argument('--retry-delay', dest='retry_delay', type=int, help='seconds to wait for retrying a transaction that is marked as sent')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to')
argparser.add_argument('-v', help='be verbose', action='store_true') argparser.add_argument('-v', help='be verbose', action='store_true')
@ -51,7 +67,6 @@ config.process()
# override args # override args
args_override = { args_override = {
'ETH_PROVIDER': getattr(args, 'p'), 'ETH_PROVIDER': getattr(args, 'p'),
'ETH_ABI_DIR': getattr(args, 'abi_dir'),
'CIC_CHAIN_SPEC': getattr(args, 'i'), 'CIC_CHAIN_SPEC': getattr(args, 'i'),
'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'), 'CIC_TX_RETRY_DELAY': getattr(args, 'retry_delay'),
} }
@ -59,6 +74,7 @@ config.dict_override(args_override, 'cli flag')
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.batch_size, '_BATCH_SIZE', True)
app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL')) app = celery.Celery(backend=config.get('CELERY_RESULT_URL'), broker=config.get('CELERY_BROKER_URL'))
@ -66,10 +82,10 @@ queue = args.q
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.registry_location(args.p, chain_spec, tag='default') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, tag='default')
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn) SessionBase.connect(dsn, debug=config.true('DATABASE_DEBUG'))
straggler_delay = int(config.get('CIC_TX_RETRY_DELAY')) straggler_delay = int(config.get('CIC_TX_RETRY_DELAY'))
@ -178,53 +194,85 @@ def dispatch(conn, chain_spec):
# s_send.apply_async() # s_send.apply_async()
class RetrySyncer(Syncer): class StragglerFilter:
def __init__(self, chain_spec, stalled_grace_seconds, failed_grace_seconds=None, final_func=None): def __init__(self, chain_spec, queue='cic-eth'):
self.chain_spec = chain_spec
self.queue = queue
def filter(self, conn, block, tx, db_session=None):
logg.debug('tx {}'.format(tx))
s_send = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx,
self.chain_spec.asdict(),
],
queue=self.queue,
)
return s_send.apply_async()
#return s_send
def __str__(self):
return 'stragglerfilter'
class RetrySyncer(HeadSyncer):
def __init__(self, conn, chain_spec, stalled_grace_seconds, batch_size=50, failed_grace_seconds=None):
backend = MemBackend(chain_spec, None)
super(RetrySyncer, self).__init__(backend)
self.chain_spec = chain_spec self.chain_spec = chain_spec
if failed_grace_seconds == None: if failed_grace_seconds == None:
failed_grace_seconds = stalled_grace_seconds failed_grace_seconds = stalled_grace_seconds
self.stalled_grace_seconds = stalled_grace_seconds self.stalled_grace_seconds = stalled_grace_seconds
self.failed_grace_seconds = failed_grace_seconds self.failed_grace_seconds = failed_grace_seconds
self.final_func = final_func self.batch_size = batch_size
self.conn = conn
def get(self): def get(self, conn):
# before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.failed_grace_seconds) o = block_latest()
# failed_txs = get_status_tx( r = conn.do(o)
# StatusEnum.SENDFAIL.value, (pair, flags) = self.backend.get()
# before=before, n = int(r, 16)
# ) if n == pair[0]:
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds) raise NoBlockForYou('block {} already checked'.format(n))
stalled_txs = get_status_tx( o = block_by_number(n)
StatusBits.IN_NETWORK.value, r = conn.do(o)
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE, b = Block(r)
before=before, return b
)
# return list(failed_txs.keys()) + list(stalled_txs.keys())
return stalled_txs
def process(self, conn, ref):
logg.debug('tx {}'.format(ref))
for f in self.filter:
f(conn, ref, None, str(self.chain_spec))
def process(self, conn, block):
before = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.stalled_grace_seconds)
stalled_txs = get_status_tx(
StatusBits.IN_NETWORK.value,
not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
before=before,
limit=self.batch_size,
)
# stalled_txs = get_upcoming_tx(
# status=StatusBits.IN_NETWORK.value,
# not_status=StatusBits.FINAL | StatusBits.MANUAL | StatusBits.OBSOLETE,
# before=before,
# limit=self.batch_size,
# )
for tx in stalled_txs:
self.filter.apply(self.conn, block, tx)
self.backend.set(block.number, 0)
def loop(self, interval):
while self.running and Syncer.running_global:
rpc = RPCConnection.connect(self.chain_spec, 'default')
for tx in self.get():
self.process(rpc, tx)
if self.final_func != None:
self.final_func(rpc, self.chain_spec)
time.sleep(interval)
def main(): def main():
#o = block_latest()
syncer = RetrySyncer(chain_spec, straggler_delay, final_func=dispatch) conn = RPCConnection.connect(chain_spec, 'default')
syncer.filter.append(sendfail_filter) #block = conn.do(o)
syncer.loop(float(straggler_delay)) syncer = RetrySyncer(conn, chain_spec, straggler_delay, batch_size=config.get('_BATCH_SIZE'))
syncer.backend.set(0, 0)
syncer.add_filter(StragglerFilter(chain_spec, queue=queue))
syncer.loop(float(straggler_delay), conn)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -7,13 +7,10 @@ import os
# third-party imports # third-party imports
import celery import celery
import confini import confini
import web3 from chainlib.chain import ChainSpec
from cic_registry import CICRegistry from chainlib.eth.connection import EthHTTPConnection
from cic_registry.chain import ChainSpec
from cic_registry.chain import ChainRegistry
# local imports # local imports
from cic_eth.eth.rpc import RpcClient
from cic_eth.api.api_admin import AdminApi from cic_eth.api.api_admin import AdminApi
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
@ -55,41 +52,20 @@ args_override = {
config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
config.add(args.tx_hash, '_TX_HASH', True)
config.add(args.unlock, '_UNLOCK', True)
chain_spec = ChainSpec.from_chain_str(args.i) chain_spec = ChainSpec.from_chain_str(args.i)
chain_str = str(chain_spec)
re_websocket = re.compile('^wss?://') rpc = EthHTTPConnection(config.get('ETH_PROVIDER'))
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = web3.Web3.WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = web3.Web3.HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
c = RpcClient(chain_spec)
CICRegistry.init(c.w3, config.get('CIC_REGISTRY_ADDRESS'), chain_spec)
chain_registry = ChainRegistry(chain_spec)
CICRegistry.add_chain_registry(chain_registry)
CICRegistry.add_path(config.get('ETH_ABI_DIR'))
CICRegistry.load_for(chain_spec)
def main(): def main():
api = AdminApi(c) api = AdminApi(rpc)
tx_details = api.tx(chain_spec, args.tx_hash) tx_details = api.tx(chain_spec, args.tx_hash)
t = api.resend(args.tx_hash, chain_str, unlock=True) t = api.resend(args.tx_hash, chain_spec, unlock=config.get('_UNLOCK'))
print(t.get_leaf())
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -12,10 +12,12 @@ from chainlib.eth.tx import (
transaction, transaction,
receipt, receipt,
) )
from hexathon import strip_0x
# local imports # local imports
from cic_eth.queue.tx import register_tx from cic_eth.queue.tx import register_tx
from cic_eth.eth.tx import cache_gas_data from cic_eth.eth.tx import cache_gas_data
from cic_eth.db.models.otx import Otx
logg = logging.getLogger() logg = logging.getLogger()
@ -60,6 +62,7 @@ def test_tx_send(
assert rcpt['status'] == 1 assert rcpt['status'] == 1
@pytest.mark.skip()
def test_sync_tx( def test_sync_tx(
default_chain_spec, default_chain_spec,
eth_rpc, eth_rpc,
@ -67,3 +70,44 @@ def test_sync_tx(
celery_worker, celery_worker,
): ):
pass pass
def test_resend_with_higher_gas(
init_database,
default_chain_spec,
eth_rpc,
eth_signer,
agent_roles,
celery_worker,
):
chain_id = default_chain_spec.chain_id()
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
(tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED)
#unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id)
register_tx(tx_hash_hex, tx_signed_raw_hex, default_chain_spec, None, session=init_database)
cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict())
tx_before = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), default_chain_spec.chain_id())
s = celery.signature(
'cic_eth.eth.tx.resend_with_higher_gas',
[
tx_hash_hex,
default_chain_spec.asdict(),
],
queue=None,
)
t = s.apply_async()
r = t.get_leaf()
q = init_database.query(Otx)
q = q.filter(Otx.tx_hash==r)
otx = q.first()
if otx == None:
raise NotLocalTxError(r)
tx_after = unpack(bytes.fromhex(strip_0x(otx.signed_tx)), default_chain_spec.chain_id())
logg.debug('gasprices before {} after {}'.format(tx_before['gasPrice'], tx_after['gasPrice']))
assert tx_after['gasPrice'] > tx_before['gasPrice']

View File

@ -23,7 +23,7 @@ from chainlib.eth.gas import RPCGasOracle
from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.nonce import RPCNonceOracle
from cic_types.processor import generate_metadata_pointer from cic_types.processor import generate_metadata_pointer
from eth_accounts_index import AccountRegistry from eth_accounts_index import AccountRegistry
from cic_eth_registry import CICRegistry from contract_registry import Registry
from crypto_dev_signer.keystore.dict import DictKeystore from crypto_dev_signer.keystore.dict import DictKeystore
from crypto_dev_signer.eth.signer.defaultsigner import ReferenceSigner as EIP155Signer from crypto_dev_signer.eth.signer.defaultsigner import ReferenceSigner as EIP155Signer
from crypto_dev_signer.keystore.keyfile import to_dict as to_keyfile_dict from crypto_dev_signer.keystore.keyfile import to_dict as to_keyfile_dict
@ -88,9 +88,11 @@ signer = EIP155Signer(keystore)
nonce_oracle = RPCNonceOracle(signer_address, rpc) nonce_oracle = RPCNonceOracle(signer_address, rpc)
CICRegistry.address = config.get('CIC_REGISTRY_ADDRESS') registry = Registry()
registry = CICRegistry(chain_spec, rpc) o = registry.address_of(config.get('CIC_REGISTRY_ADDRESS'), 'AccountRegistry')
account_registry_address = registry.by_name('AccountRegistry') r = rpc.do(o)
account_registry_address = registry.parse_address_of(r)
logg.info('using account registry {}'.format(account_registry_address))
keyfile_dir = os.path.join(config.get('_USERDIR'), 'keystore') keyfile_dir = os.path.join(config.get('_USERDIR'), 'keystore')
os.makedirs(keyfile_dir) os.makedirs(keyfile_dir)

View File

@ -36,7 +36,7 @@ argparser.add_argument('--redis-port', dest='redis_port', type=int, help='redis
argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback') argparser.add_argument('--redis-db', dest='redis_db', type=int, help='redis db to use for task submission and callback')
argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback')
argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback')
argparser.add_argument('--batch-size', dest='batch_size', default=50, type=int, help='burst size of sending transactions to node') argparser.add_argument('--batch-size', dest='batch_size', default=80, type=int, help='burst size of sending transactions to node')
argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches') argparser.add_argument('--batch-delay', dest='batch_delay', default=2, type=int, help='seconds delay between batches')
argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout') argparser.add_argument('--timeout', default=60.0, type=float, help='Callback timeout')
argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue') argparser.add_argument('-q', type=str, default='cic-eth', help='Task queue')

View File

@ -10,6 +10,7 @@ import hashlib
import csv import csv
import json import json
import urllib import urllib
import copy
# external imports # external imports
import celery import celery
@ -39,7 +40,6 @@ from chainlib.eth.gas import (
from chainlib.eth.tx import TxFactory from chainlib.eth.tx import TxFactory
from chainlib.eth.rpc import jsonrpc_template from chainlib.eth.rpc import jsonrpc_template
from chainlib.eth.error import EthException from chainlib.eth.error import EthException
from cic_eth.api.api_admin import AdminApi
from cic_types.models.person import ( from cic_types.models.person import (
Person, Person,
generate_metadata_pointer, generate_metadata_pointer,
@ -51,12 +51,27 @@ logg = logging.getLogger()
config_dir = '/usr/local/etc/cic-syncer' config_dir = '/usr/local/etc/cic-syncer'
custodial_tests = [
'local_key',
'gas',
'faucet',
]
all_tests = custodial_tests + [
'accounts_index',
'balance',
'metadata',
]
argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks') argparser = argparse.ArgumentParser(description='daemon that monitors transactions in new blocks')
argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address') argparser.add_argument('-p', '--provider', dest='p', type=str, help='chain rpc provider address')
argparser.add_argument('-c', type=str, default=config_dir, help='config root to use') argparser.add_argument('-c', type=str, default=config_dir, help='config root to use')
argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec') argparser.add_argument('--old-chain-spec', type=str, dest='old_chain_spec', default='evm:oldchain:1', help='chain spec')
argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec') argparser.add_argument('-i', '--chain-spec', type=str, dest='i', help='chain spec')
argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url') argparser.add_argument('--meta-provider', type=str, dest='meta_provider', default='http://localhost:63380', help='cic-meta url')
argparser.add_argument('--skip-custodial', dest='skip_custodial', action='store_true', help='skip all custodial verifications')
argparser.add_argument('--exclude', action='append', type=str, default=[], help='skip specified verification')
argparser.add_argument('--include', action='append', type=str, help='include specified verification')
argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address') argparser.add_argument('-r', '--registry-address', type=str, dest='r', help='CIC Registry address')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error') argparser.add_argument('-x', '--exit-on-error', dest='x', action='store_true', help='Halt exection on error')
@ -95,14 +110,61 @@ user_dir = args.user_dir # user_out_dir from import_users.py
meta_url = args.meta_provider meta_url = args.meta_provider
exit_on_error = args.x exit_on_error = args.x
active_tests = []
exclude = []
include = args.include
if args.include == None:
include = all_tests
for t in args.exclude:
if t not in all_tests:
raise ValueError('Cannot exclude unknown verification "{}"'.format(t))
exclude.append(t)
if args.skip_custodial:
logg.info('will skip all custodial verifications ({})'.format(','.join(custodial_tests)))
for t in custodial_tests:
if t not in exclude:
exclude.append(t)
for t in include:
if t not in all_tests:
raise ValueError('Cannot include unknown verification "{}"'.format(t))
if t not in exclude:
active_tests.append(t)
logg.info('will perform verification "{}"'.format(t))
api = None
for t in custodial_tests:
if t in active_tests:
from cic_eth.api.api_admin import AdminApi
api = AdminApi(None)
logg.info('activating custodial module'.format(t))
break
cols = os.get_terminal_size().columns
def to_terminalwidth(s):
ss = s.ljust(int(cols)-1)
ss += "\r"
return ss
def default_outfunc(s):
ss = to_terminalwidth(s)
sys.stdout.write(ss)
outfunc = default_outfunc
if logg.isEnabledFor(logging.DEBUG):
outfunc = logg.debug
class VerifierState: class VerifierState:
def __init__(self, item_keys): def __init__(self, item_keys, active_tests=None):
self.items = {} self.items = {}
for k in item_keys: for k in item_keys:
logg.info('k {}'.format(k))
self.items[k] = 0 self.items[k] = 0
if active_tests == None:
self.active_tests = copy.copy(item_keys)
else:
self.active_tests = copy.copy(active_tests)
def poke(self, item_key): def poke(self, item_key):
@ -112,7 +174,10 @@ class VerifierState:
def __str__(self): def __str__(self):
r = '' r = ''
for k in self.items.keys(): for k in self.items.keys():
r += '{}: {}\n'.format(k, self.items[k]) if k in self.active_tests:
r += '{}: {}\n'.format(k, self.items[k])
else:
r += '{}: skipped\n'.format(k)
return r return r
@ -148,10 +213,10 @@ class Verifier:
verifymethods = [] verifymethods = []
for k in dir(self): for k in dir(self):
if len(k) > 7 and k[:7] == 'verify_': if len(k) > 7 and k[:7] == 'verify_':
logg.info('adding verify method {}'.format(k)) logg.debug('verifier has verify method {}'.format(k))
verifymethods.append(k[7:]) verifymethods.append(k[7:])
self.state = VerifierState(verifymethods) self.state = VerifierState(verifymethods, active_tests=active_tests)
def verify_accounts_index(self, address, balance=None): def verify_accounts_index(self, address, balance=None):
@ -233,26 +298,14 @@ class Verifier:
raise VerifierError(o_retrieved, 'metadata (person)') raise VerifierError(o_retrieved, 'metadata (person)')
def verify(self, address, balance): def verify(self, address, balance, debug_stem=None):
logg.debug('verify {} {}'.format(address, balance))
methods = [ for k in active_tests:
'local_key', s = '{} {}'.format(debug_stem, k)
'accounts_index', outfunc(s)
'balance',
# 'metadata',
'gas',
'faucet',
]
for k in methods:
try: try:
m = getattr(self, 'verify_{}'.format(k)) m = getattr(self, 'verify_{}'.format(k))
m(address, balance) m(address, balance)
# self.verify_local_key(address)
# self.verify_accounts_index(address)
# self.verify_balance(address, balance)
# self.verify_metadata(address)
except VerifierError as e: except VerifierError as e:
logline = 'verification {} failed for {}: {}'.format(k, address, str(e)) logline = 'verification {} failed for {}: {}'.format(k, address, str(e))
if self.exit_on_error: if self.exit_on_error:
@ -266,10 +319,6 @@ class Verifier:
return str(self.state) return str(self.state)
class MockClient:
w3 = None
def main(): def main():
global chain_str, block_offset, user_dir global chain_str, block_offset, user_dir
@ -291,7 +340,6 @@ def main():
o['params'].append(txf.normalize(tx)) o['params'].append(txf.normalize(tx))
o['params'].append('latest') o['params'].append('latest')
r = conn.do(o) r = conn.do(o)
print('r {}'.format(r))
token_index_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) token_index_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token index address {}'.format(token_index_address)) logg.info('found token index address {}'.format(token_index_address))
@ -320,6 +368,7 @@ def main():
logg.info('found faucet {}'.format(faucet_address)) logg.info('found faucet {}'.format(faucet_address))
# Get Sarafu token address # Get Sarafu token address
tx = txf.template(ZERO_ADDRESS, token_index_address) tx = txf.template(ZERO_ADDRESS, token_index_address)
data = add_0x(registry_addressof_method) data = add_0x(registry_addressof_method)
@ -333,7 +382,6 @@ def main():
o['params'].append(txf.normalize(tx)) o['params'].append(txf.normalize(tx))
o['params'].append('latest') o['params'].append('latest')
r = conn.do(o) r = conn.do(o)
print('r {}'.format(r))
sarafu_token_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r)))) sarafu_token_address = to_checksum_address(eth_abi.decode_single('address', bytes.fromhex(strip_0x(r))))
logg.info('found token address {}'.format(sarafu_token_address)) logg.info('found token address {}'.format(sarafu_token_address))
@ -348,7 +396,7 @@ def main():
try: try:
address = to_checksum_address(r[0]) address = to_checksum_address(r[0])
#sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r") #sys.stdout.write('loading balance {} {}'.format(i, address).ljust(200) + "\r")
logg.debug('loading balance {} {}'.format(i, address).ljust(200)) outfunc('loading balance {} {}'.format(i, address)) #.ljust(200))
except ValueError: except ValueError:
break break
balance = int(r[1].rstrip()) balance = int(r[1].rstrip())
@ -357,11 +405,10 @@ def main():
f.close() f.close()
api = AdminApi(MockClient())
verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, faucet_address, user_dir, exit_on_error) verifier = Verifier(conn, api, gas_oracle, chain_spec, account_index_address, sarafu_token_address, faucet_address, user_dir, exit_on_error)
user_new_dir = os.path.join(user_dir, 'new') user_new_dir = os.path.join(user_dir, 'new')
i = 0
for x in os.walk(user_new_dir): for x in os.walk(user_new_dir):
for y in x[2]: for y in x[2]:
if y[len(y)-5:] != '.json': if y[len(y)-5:] != '.json':
@ -377,7 +424,7 @@ def main():
f.close() f.close()
u = Person.deserialize(o) u = Person.deserialize(o)
logg.debug('data {}'.format(u.identities['evm'])) #logg.debug('data {}'.format(u.identities['evm']))
subchain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id()) subchain_str = '{}:{}'.format(chain_spec.common_name(), chain_spec.network_id())
new_address = u.identities['evm'][subchain_str][0] new_address = u.identities['evm'][subchain_str][0]
@ -388,9 +435,11 @@ def main():
balance = balances[old_address] balance = balances[old_address]
except KeyError: except KeyError:
logg.info('no old balance found for {}, assuming 0'.format(old_address)) logg.info('no old balance found for {}, assuming 0'.format(old_address))
logg.debug('checking {} -> {} = {}'.format(old_address, new_address, balance))
verifier.verify(new_address, balance) s = 'checking {}: {} -> {} = {}'.format(i, old_address, new_address, balance)
verifier.verify(new_address, balance, debug_stem=s)
i += 1
print(verifier) print(verifier)

View File

@ -317,8 +317,8 @@ services:
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis} CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis}
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
#DATABASE_DEBUG: ${DATABASE_DEBUG:-false} DATABASE_DEBUG: ${DATABASE_DEBUG:-false}
DATABASE_DEBUG: 1 #DATABASE_DEBUG: 1
depends_on: depends_on:
- eth - eth
@ -359,6 +359,8 @@ services:
CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis} CELERY_RESULT_URL: ${CELERY_RESULT_URL:-redis://redis}
TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS TASKS_TRANSFER_CALLBACKS: $TASKS_TRANSFER_CALLBACKS
CIC_TX_RETRY_DELAY: 15 CIC_TX_RETRY_DELAY: 15
BATCH_SIZE: ${RETRIER_BATCH_SIZE:-50}
#DATABASE_DEBUG: 1
depends_on: depends_on:
- eth - eth
- postgres - postgres
@ -373,7 +375,7 @@ services:
- -c - -c
- | - |
if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi
./start_retry.sh -v ./start_retry.sh -vv
# command: "/root/start_retry.sh -q cic-eth -vv" # command: "/root/start_retry.sh -q cic-eth -vv"