From aa15353d6805eca85d126878d8d6c37c275a98b7 Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Sun, 25 Apr 2021 12:24:17 +0000 Subject: [PATCH] Use task pool rpc for registry and eth queries with cic-eth view cli util --- apps/cic-eth/cic_eth/api/api_admin.py | 176 ++++++++++++------ apps/cic-eth/cic_eth/registry.py | 2 +- apps/cic-eth/cic_eth/runnable/ctrl.py | 1 - .../cic_eth/runnable/daemons/tasker.py | 1 - apps/cic-eth/cic_eth/runnable/transfer.py | 3 - apps/cic-eth/cic_eth/runnable/view.py | 18 +- apps/cic-eth/cic_eth/task.py | 39 +++- apps/cic-eth/cic_eth/version.py | 2 +- 8 files changed, 167 insertions(+), 75 deletions(-) diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index a0c804f..5862dd6 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -60,6 +60,29 @@ class AdminApi: self.call_address = call_address + def proxy_do(self, chain_spec, o): + s_proxy = celery.signature( + 'cic_eth.task.rpc_proxy', + [ + chain_spec.asdict(), + o, + 'default', + ], + queue=self.queue + ) + return s_proxy.apply_async() + + + + def registry(self): + s_registry = celery.signature( + 'cic_eth.task.registry', + [], + queue=self.queue + ) + return s_registry.apply_async() + + def unlock(self, chain_spec, address, flags=None): s_unlock = celery.signature( 'cic_eth.admin.ctrl.unlock', @@ -146,7 +169,6 @@ class AdminApi: # TODO: This check should most likely be in resend task itself tx_dict = s_get_tx_cache.apply_async().get() - #if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]: if not is_alive(getattr(StatusEnum, tx_dict['status']).value): raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex)) @@ -226,9 +248,6 @@ class AdminApi: break last_nonce = nonce_otx - #nonce_cache = Nonce.get(address) - #nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending') - return { 'nonce': { #'network': nonce_cache, @@ -272,20 +291,6 @@ class AdminApi: return s_nonce.apply_async() -# # TODO: this is a stub, complete all checks -# def ready(self): -# """Checks whether all required initializations have been performed. -# -# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met. -# :raises KeyError: An address provided for initialization is not known by the keystore. -# """ -# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS') -# if addr == ZERO_ADDRESS: -# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS') -# -# self.w3.eth.sign(addr, text='666f6f') - - def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout): """Lists locally originated transactions for the given Ethereum address. @@ -348,6 +353,7 @@ class AdminApi: # TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring + # TODO: This method is WAY too long def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout): """Output local and network details about a given transaction with local origin. @@ -370,7 +376,6 @@ class AdminApi: if tx_raw != None: tx_hash = add_0x(keccak256_hex_to_hex(tx_raw)) - #tx_hash = self.w3.keccak(hexstr=tx_raw).hex() s = celery.signature( 'cic_eth.queue.query.get_tx_cache', @@ -386,38 +391,78 @@ class AdminApi: source_token = None if tx['source_token'] != ZERO_ADDRESS: - try: - source_token = registry.by_address(tx['source_token']) - #source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract - except UnknownContractError: - #source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token']) - #source_token = CICRegistry.add_token(chain_spec, source_token_contract) - logg.warning('unknown source token contract {}'.format(tx['source_token'])) + if registry != None: + try: + source_token = registry.by_address(tx['source_token']) + except UnknownContractError: + logg.warning('unknown source token contract {} (direct)'.format(tx['source_token'])) + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['source_token'], + ], + queue=self.queue + ) + t = s.apply_async() + source_token = t.get() + if source_token == None: + logg.warning('unknown source token contract {} (task pool)'.format(tx['source_token'])) + destination_token = None - if tx['source_token'] != ZERO_ADDRESS: - try: - #destination_token = CICRegistry.get_address(chain_spec, tx['destination_token']) - destination_token = registry.by_address(tx['destination_token']) - except UnknownContractError: - #destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token']) - #destination_token = CICRegistry.add_token(chain_spec, destination_token_contract) - logg.warning('unknown destination token contract {}'.format(tx['destination_token'])) + if tx['destination_token'] != ZERO_ADDRESS: + if registry != None: + try: + destination_token = registry.by_address(tx['destination_token']) + except UnknownContractError: + logg.warning('unknown destination token contract {}'.format(tx['destination_token'])) + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['destination_token'], + ], + queue=self.queue + ) + t = s.apply_async() + destination_token = t.get() + if destination_token == None: + logg.warning('unknown destination token contract {} (task pool)'.format(tx['destination_token'])) + tx['sender_description'] = 'Custodial account' tx['recipient_description'] = 'Custodial account' o = code(tx['sender']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if len(strip_0x(r, allow_empty=True)) > 0: - try: - #sender_contract = CICRegistry.get_address(chain_spec, tx['sender']) - sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address) - tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract) - except UnknownContractError: - tx['sender_description'] = 'Unknown contract' - except KeyError as e: - tx['sender_description'] = 'Unknown contract' + if registry != None: + try: + sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address) + tx['sender_description'] = 'Contract at {}'.format(tx['sender']) + except UnknownContractError: + tx['sender_description'] = 'Unknown contract' + except KeyError as e: + tx['sender_description'] = 'Unknown contract' + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['sender'], + ], + queue=self.queue + ) + t = s.apply_async() + tx['sender_description'] = t.get() + if tx['sender_description'] == None: + tx['sender_description'] = 'Unknown contract' + + else: s = celery.signature( 'cic_eth.eth.account.have', @@ -446,16 +491,31 @@ class AdminApi: tx['sender_description'] = role o = code(tx['recipient']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if len(strip_0x(r, allow_empty=True)) > 0: - try: - #recipient_contract = CICRegistry.by_address(tx['recipient']) - recipient_contract = registry.by_address(tx['recipient']) - tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract) - except UnknownContractError as e: - tx['recipient_description'] = 'Unknown contract' - except KeyError as e: - tx['recipient_description'] = 'Unknown contract' + if registry != None: + try: + recipient_contract = registry.by_address(tx['recipient']) + tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) + except UnknownContractError as e: + tx['recipient_description'] = 'Unknown contract' + except KeyError as e: + tx['recipient_description'] = 'Unknown contract' + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['recipient'], + ], + queue=self.queue + ) + t = s.apply_async() + tx['recipient_description'] = t.get() + if tx['recipient_description'] == None: + tx['recipient_description'] = 'Unknown contract' + else: s = celery.signature( 'cic_eth.eth.account.have', @@ -497,7 +557,8 @@ class AdminApi: r = None try: o = transaction(tx_hash) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if r != None: tx['network_status'] = 'Mempool' except Exception as e: @@ -506,7 +567,8 @@ class AdminApi: if r != None: try: o = receipt(tx_hash) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() logg.debug('h {} o {}'.format(tx_hash, o)) if int(strip_0x(r['status'])) == 1: tx['network_status'] = 'Confirmed' @@ -521,11 +583,13 @@ class AdminApi: pass o = balance(tx['sender']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() tx['sender_gas_balance'] = r o = balance(tx['recipient']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() tx['recipient_gas_balance'] = r tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec) diff --git a/apps/cic-eth/cic_eth/registry.py b/apps/cic-eth/cic_eth/registry.py index e1e481e..2b5e2b9 100644 --- a/apps/cic-eth/cic_eth/registry.py +++ b/apps/cic-eth/cic_eth/registry.py @@ -29,5 +29,5 @@ def connect(rpc, chain_spec, registry_address): CICRegistry.address = registry_address registry = CICRegistry(chain_spec, rpc) registry_address = registry.by_name('ContractRegistry') - return registry + diff --git a/apps/cic-eth/cic_eth/runnable/ctrl.py b/apps/cic-eth/cic_eth/runnable/ctrl.py index 8f9df08..454b765 100644 --- a/apps/cic-eth/cic_eth/runnable/ctrl.py +++ b/apps/cic-eth/cic_eth/runnable/ctrl.py @@ -23,7 +23,6 @@ default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic') argparser = argparse.ArgumentParser() argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)') -argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address') argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format') argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index b6d74c5..de6a4f2 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -36,7 +36,6 @@ from cic_eth.eth import ( from cic_eth.admin import ( debug, ctrl, - token, ) from cic_eth.queue import ( query, diff --git a/apps/cic-eth/cic_eth/runnable/transfer.py b/apps/cic-eth/cic_eth/runnable/transfer.py index 6f40878..c63c72a 100644 --- a/apps/cic-eth/cic_eth/runnable/transfer.py +++ b/apps/cic-eth/cic_eth/runnable/transfer.py @@ -85,9 +85,6 @@ def main(): callback_queue=args.q, ) - #register = not args.no_register - #logg.debug('register {}'.format(register)) - #t = api.create_account(register=register) t = api.transfer(config.get('_SENDER'), config.get('_RECIPIENT'), config.get('_VALUE'), config.get('_SYMBOL')) ps.get_message() diff --git a/apps/cic-eth/cic_eth/runnable/view.py b/apps/cic-eth/cic_eth/runnable/view.py index 50adce3..ef08e2a 100644 --- a/apps/cic-eth/cic_eth/runnable/view.py +++ b/apps/cic-eth/cic_eth/runnable/view.py @@ -81,10 +81,14 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) rpc = EthHTTPConnection(args.p) -registry_address = config.get('CIC_REGISTRY_ADDRESS') +#registry_address = config.get('CIC_REGISTRY_ADDRESS') admin_api = AdminApi(rpc) +t = admin_api.registry() +registry_address = t.get() +logg.info('got registry address from task pool: {}'.format(registry_address)) + trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') if trusted_addresses_src == None: logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') @@ -151,14 +155,16 @@ def main(): txs = [] renderer = render_tx if len(config.get('_QUERY')) > 66: - registry = connect_registry(rpc, chain_spec, registry_address) - admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer) + #registry = connect_registry(rpc, chain_spec, registry_address) + #admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer) + admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), renderer=renderer) elif len(config.get('_QUERY')) > 42: - registry = connect_registry(rpc, chain_spec, registry_address) - admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer) + #registry = connect_registry(rpc, chain_spec, registry_address) + #admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer) + admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), renderer=renderer) elif len(config.get('_QUERY')) == 42: - registry = connect_registry(rpc, chain_spec, registry_address) + #registry = connect_registry(rpc, chain_spec, registry_address) txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False, renderer=render_account) renderer = render_account elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock': diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index 3fe201f..d0d41d4 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -7,9 +7,13 @@ import uuid # external imports import celery import sqlalchemy +from chainlib.chain import ChainSpec +from chainlib.connection import RPCConnection from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.gas import RPCGasOracle +from cic_eth_registry import CICRegistry +from cic_eth_registry.error import UnknownContractError import liveness.linux # local imports @@ -101,12 +105,35 @@ class CriticalWeb3AndSignerTask(CriticalTask): safe_gas_refill_amount = safe_gas_threshold_amount * 5 -@celery_app.task(bind=True, base=BaseTask) -def hello(self): - time.sleep(0.1) - return id(SessionBase.create_session) - - @celery_app.task() def check_health(self): pass + + +# TODO: registry / rpc methods should perhaps be moved to better named module +@celery_app.task() +def registry(): + return CICRegistry.address + + +@celery_app.task() +def registry_address_lookup(chain_spec_dict, address, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + registry = CICRegistry(chain_spec, conn) + return registry.by_address(address) + + +@celery_app.task(throws=(UnknownContractError,)) +def registry_name_lookup(chain_spec_dict, name, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + registry = CICRegistry(chain_spec, conn) + return registry.by_name(name) + + +@celery_app.task() +def rpc_proxy(chain_spec_dict, o, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + return conn.do(o) diff --git a/apps/cic-eth/cic_eth/version.py b/apps/cic-eth/cic_eth/version.py index a1f9dd2..a57e9be 100644 --- a/apps/cic-eth/cic_eth/version.py +++ b/apps/cic-eth/cic_eth/version.py @@ -10,7 +10,7 @@ version = ( 0, 11, 0, - 'beta.10', + 'beta.11', ) version_object = semver.VersionInfo(