diff --git a/apps/cic-eth/cic_eth/admin/ctrl.py b/apps/cic-eth/cic_eth/admin/ctrl.py index 0b55ffc5..af6dd647 100644 --- a/apps/cic-eth/cic_eth/admin/ctrl.py +++ b/apps/cic-eth/cic_eth/admin/ctrl.py @@ -2,7 +2,7 @@ import datetime import logging -# third-party imports +# external imports import celery from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec @@ -145,3 +145,9 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None): session.flush() session.close() return chained_input + + +@celery_app.task() +def shutdown(message): + logg.critical('shutdown called: {}'.format(message)) + celery_app.control.shutdown() #broadcast('shutdown') diff --git a/apps/cic-eth/cic_eth/admin/token.py b/apps/cic-eth/cic_eth/admin/token.py new file mode 100644 index 00000000..ffb17568 --- /dev/null +++ b/apps/cic-eth/cic_eth/admin/token.py @@ -0,0 +1,19 @@ +# standard imports +import logging + +# external imports +import celery + +# local imports +from cic_eth.task import BaseTask + +celery_app = celery.current_app +logg = logging.getLogger() + + +@celery_app.task(bind=True, base=BaseTask) +def default_token(self): + return { + 'symbol': self.default_token_symbol, + 'address': self.default_token_address, + } diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index a0c804f2..5862dd65 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -60,6 +60,29 @@ class AdminApi: self.call_address = call_address + def proxy_do(self, chain_spec, o): + s_proxy = celery.signature( + 'cic_eth.task.rpc_proxy', + [ + chain_spec.asdict(), + o, + 'default', + ], + queue=self.queue + ) + return s_proxy.apply_async() + + + + def registry(self): + s_registry = celery.signature( + 'cic_eth.task.registry', + [], + queue=self.queue + ) + return s_registry.apply_async() + + def unlock(self, chain_spec, address, flags=None): s_unlock = celery.signature( 'cic_eth.admin.ctrl.unlock', @@ -146,7 +169,6 @@ class AdminApi: # TODO: This check should most likely be in resend task itself tx_dict = s_get_tx_cache.apply_async().get() - #if tx_dict['status'] in [StatusEnum.REVERTED, StatusEnum.SUCCESS, StatusEnum.CANCELLED, StatusEnum.OBSOLETED]: if not is_alive(getattr(StatusEnum, tx_dict['status']).value): raise TxStateChangeError('Cannot resend mined or obsoleted transaction'.format(txold_hash_hex)) @@ -226,9 +248,6 @@ class AdminApi: break last_nonce = nonce_otx - #nonce_cache = Nonce.get(address) - #nonce_w3 = self.w3.eth.getTransactionCount(address, 'pending') - return { 'nonce': { #'network': nonce_cache, @@ -272,20 +291,6 @@ class AdminApi: return s_nonce.apply_async() -# # TODO: this is a stub, complete all checks -# def ready(self): -# """Checks whether all required initializations have been performed. -# -# :raises cic_eth.error.InitializationError: At least one setting pre-requisite has not been met. -# :raises KeyError: An address provided for initialization is not known by the keystore. -# """ -# addr = AccountRole.get_address('ETH_GAS_PROVIDER_ADDRESS') -# if addr == ZERO_ADDRESS: -# raise InitializationError('missing account ETH_GAS_PROVIDER_ADDRESS') -# -# self.w3.eth.sign(addr, text='666f6f') - - def account(self, chain_spec, address, include_sender=True, include_recipient=True, renderer=None, w=sys.stdout): """Lists locally originated transactions for the given Ethereum address. @@ -348,6 +353,7 @@ class AdminApi: # TODO: Add exception upon non-existent tx aswell as invalid tx data to docstring + # TODO: This method is WAY too long def tx(self, chain_spec, tx_hash=None, tx_raw=None, registry=None, renderer=None, w=sys.stdout): """Output local and network details about a given transaction with local origin. @@ -370,7 +376,6 @@ class AdminApi: if tx_raw != None: tx_hash = add_0x(keccak256_hex_to_hex(tx_raw)) - #tx_hash = self.w3.keccak(hexstr=tx_raw).hex() s = celery.signature( 'cic_eth.queue.query.get_tx_cache', @@ -386,38 +391,78 @@ class AdminApi: source_token = None if tx['source_token'] != ZERO_ADDRESS: - try: - source_token = registry.by_address(tx['source_token']) - #source_token = CICRegistry.get_address(chain_spec, tx['source_token']).contract - except UnknownContractError: - #source_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token']) - #source_token = CICRegistry.add_token(chain_spec, source_token_contract) - logg.warning('unknown source token contract {}'.format(tx['source_token'])) + if registry != None: + try: + source_token = registry.by_address(tx['source_token']) + except UnknownContractError: + logg.warning('unknown source token contract {} (direct)'.format(tx['source_token'])) + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['source_token'], + ], + queue=self.queue + ) + t = s.apply_async() + source_token = t.get() + if source_token == None: + logg.warning('unknown source token contract {} (task pool)'.format(tx['source_token'])) + destination_token = None - if tx['source_token'] != ZERO_ADDRESS: - try: - #destination_token = CICRegistry.get_address(chain_spec, tx['destination_token']) - destination_token = registry.by_address(tx['destination_token']) - except UnknownContractError: - #destination_token_contract = self.w3.eth.contract(abi=CICRegistry.abi('ERC20'), address=tx['source_token']) - #destination_token = CICRegistry.add_token(chain_spec, destination_token_contract) - logg.warning('unknown destination token contract {}'.format(tx['destination_token'])) + if tx['destination_token'] != ZERO_ADDRESS: + if registry != None: + try: + destination_token = registry.by_address(tx['destination_token']) + except UnknownContractError: + logg.warning('unknown destination token contract {}'.format(tx['destination_token'])) + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['destination_token'], + ], + queue=self.queue + ) + t = s.apply_async() + destination_token = t.get() + if destination_token == None: + logg.warning('unknown destination token contract {} (task pool)'.format(tx['destination_token'])) + tx['sender_description'] = 'Custodial account' tx['recipient_description'] = 'Custodial account' o = code(tx['sender']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if len(strip_0x(r, allow_empty=True)) > 0: - try: - #sender_contract = CICRegistry.get_address(chain_spec, tx['sender']) - sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address) - tx['sender_description'] = 'Contract at {}'.format(tx['sender']) #sender_contract) - except UnknownContractError: - tx['sender_description'] = 'Unknown contract' - except KeyError as e: - tx['sender_description'] = 'Unknown contract' + if registry != None: + try: + sender_contract = registry.by_address(tx['sender'], sender_address=self.call_address) + tx['sender_description'] = 'Contract at {}'.format(tx['sender']) + except UnknownContractError: + tx['sender_description'] = 'Unknown contract' + except KeyError as e: + tx['sender_description'] = 'Unknown contract' + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['sender'], + ], + queue=self.queue + ) + t = s.apply_async() + tx['sender_description'] = t.get() + if tx['sender_description'] == None: + tx['sender_description'] = 'Unknown contract' + + else: s = celery.signature( 'cic_eth.eth.account.have', @@ -446,16 +491,31 @@ class AdminApi: tx['sender_description'] = role o = code(tx['recipient']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if len(strip_0x(r, allow_empty=True)) > 0: - try: - #recipient_contract = CICRegistry.by_address(tx['recipient']) - recipient_contract = registry.by_address(tx['recipient']) - tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) #recipient_contract) - except UnknownContractError as e: - tx['recipient_description'] = 'Unknown contract' - except KeyError as e: - tx['recipient_description'] = 'Unknown contract' + if registry != None: + try: + recipient_contract = registry.by_address(tx['recipient']) + tx['recipient_description'] = 'Contract at {}'.format(tx['recipient']) + except UnknownContractError as e: + tx['recipient_description'] = 'Unknown contract' + except KeyError as e: + tx['recipient_description'] = 'Unknown contract' + else: + s = celery.signature( + 'cic_eth.task.registry_address_lookup', + [ + chain_spec.asdict(), + tx['recipient'], + ], + queue=self.queue + ) + t = s.apply_async() + tx['recipient_description'] = t.get() + if tx['recipient_description'] == None: + tx['recipient_description'] = 'Unknown contract' + else: s = celery.signature( 'cic_eth.eth.account.have', @@ -497,7 +557,8 @@ class AdminApi: r = None try: o = transaction(tx_hash) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() if r != None: tx['network_status'] = 'Mempool' except Exception as e: @@ -506,7 +567,8 @@ class AdminApi: if r != None: try: o = receipt(tx_hash) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() logg.debug('h {} o {}'.format(tx_hash, o)) if int(strip_0x(r['status'])) == 1: tx['network_status'] = 'Confirmed' @@ -521,11 +583,13 @@ class AdminApi: pass o = balance(tx['sender']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() tx['sender_gas_balance'] = r o = balance(tx['recipient']) - r = self.rpc.do(o) + t = self.proxy_do(chain_spec, o) + r = t.get() tx['recipient_gas_balance'] = r tx_unpacked = unpack(bytes.fromhex(strip_0x(tx['signed_tx'])), chain_spec) diff --git a/apps/cic-eth/cic_eth/check/redis.py b/apps/cic-eth/cic_eth/check/redis.py new file mode 100644 index 00000000..7a147fe9 --- /dev/null +++ b/apps/cic-eth/cic_eth/check/redis.py @@ -0,0 +1,18 @@ +# external imports +import redis +import os + + +def health(*args, **kwargs): + r = redis.Redis( + host=kwargs['config'].get('REDIS_HOST'), + port=kwargs['config'].get('REDIS_PORT'), + db=kwargs['config'].get('REDIS_DB'), + ) + try: + r.set(kwargs['unit'], os.getpid()) + except redis.connection.ConnectionError: + return False + except redis.connection.ResponseError: + return False + return True diff --git a/apps/cic-eth/cic_eth/error.py b/apps/cic-eth/cic_eth/error.py index 17053158..781fed85 100644 --- a/apps/cic-eth/cic_eth/error.py +++ b/apps/cic-eth/cic_eth/error.py @@ -48,6 +48,8 @@ class RoleMissingError(Exception): pass + + class IntegrityError(Exception): """Exception raised to signal irregularities with deduplication and ordering of tasks @@ -62,15 +64,19 @@ class LockedError(Exception): pass -class SignerError(Exception): +class SeppukuError(Exception): + """Exception base class for all errors that should cause system shutdown + + """ + + +class SignerError(SeppukuError): """Exception raised when signer is unavailable or generates an error """ pass -class EthError(Exception): - """Exception raised when unspecified error from evm node is encountered - +class RoleAgencyError(SeppukuError): + """Exception raise when a role cannot perform its function. This is a critical exception """ - pass diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index e35760ce..46b0060c 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -4,10 +4,10 @@ import logging # external imports import celery from erc20_single_shot_faucet import SingleShotFaucet as Faucet -from chainlib.eth.constant import ZERO_ADDRESS from hexathon import ( strip_0x, ) +from chainlib.eth.constant import ZERO_ADDRESS from chainlib.connection import RPCConnection from chainlib.eth.sign import ( new_account, @@ -19,6 +19,7 @@ from chainlib.eth.tx import ( unpack, ) from chainlib.chain import ChainSpec +from chainlib.error import JSONRPCException from eth_accounts_index import AccountRegistry from sarafu_faucet import MinterFaucet as Faucet from chainqueue.db.models.tx import TxCache @@ -70,11 +71,18 @@ def create(self, password, chain_spec_dict): a = None conn = RPCConnection.connect(chain_spec, 'signer') o = new_account() - a = conn.do(o) + try: + a = conn.do(o) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) conn.disconnect() + # TODO: It seems infeasible that a can be None in any case, verify if a == None: raise SignerError('create account') + logg.debug('created account {}'.format(a)) # Initialize nonce provider record for account @@ -219,21 +227,22 @@ def have(self, account, chain_spec_dict): """ chain_spec = ChainSpec.from_dict(chain_spec_dict) o = sign_message(account, '0x2a') - try: - conn = RPCConnection.connect(chain_spec, 'signer') - except Exception as e: - logg.debug('cannot sign with {}: {}'.format(account, e)) - return None + conn = RPCConnection.connect(chain_spec, 'signer') try: conn.do(o) - conn.disconnect() - return account - except Exception as e: + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) + except JSONRPCException as e: logg.debug('cannot sign with {}: {}'.format(account, e)) conn.disconnect() return None + conn.disconnect() + return account + @celery_app.task(bind=True, base=CriticalSQLAlchemyTask) def set_role(self, tag, address, chain_spec_dict): diff --git a/apps/cic-eth/cic_eth/eth/erc20.py b/apps/cic-eth/cic_eth/eth/erc20.py index 5bde32b9..71dfd6a2 100644 --- a/apps/cic-eth/cic_eth/eth/erc20.py +++ b/apps/cic-eth/cic_eth/eth/erc20.py @@ -108,7 +108,13 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) - (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) + except FileNotFoundError as e: + raise SignerError(e) + except ConnectionError as e: + raise SignerError(e) + rpc_signer.disconnect() rpc.disconnect() @@ -171,7 +177,12 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) - (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) + except FileNotFoundError as e: + raise SignerError(e) + except ConnectionError as e: + raise SignerError(e) rpc_signer.disconnect() rpc.disconnect() diff --git a/apps/cic-eth/cic_eth/eth/gas.py b/apps/cic-eth/cic_eth/eth/gas.py index f7b71eb9..258054c0 100644 --- a/apps/cic-eth/cic_eth/eth/gas.py +++ b/apps/cic-eth/cic_eth/eth/gas.py @@ -328,7 +328,12 @@ def refill_gas(self, recipient_address, chain_spec_dict): # build and add transaction logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address)) - (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex)) cache_task = 'cic_eth.eth.gas.cache_gas_data' register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session) @@ -404,7 +409,12 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, defa c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle) logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) tx['gasPrice'] = new_gas_price - (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) queue_create( chain_spec, tx['nonce'], diff --git a/apps/cic-eth/cic_eth/registry.py b/apps/cic-eth/cic_eth/registry.py index e1e481ef..2b5e2b94 100644 --- a/apps/cic-eth/cic_eth/registry.py +++ b/apps/cic-eth/cic_eth/registry.py @@ -29,5 +29,5 @@ def connect(rpc, chain_spec, registry_address): CICRegistry.address = registry_address registry = CICRegistry(chain_spec, rpc) registry_address = registry.by_name('ContractRegistry') - return registry + diff --git a/apps/cic-eth/cic_eth/runnable/ctrl.py b/apps/cic-eth/cic_eth/runnable/ctrl.py index 8f9df080..454b765d 100644 --- a/apps/cic-eth/cic_eth/runnable/ctrl.py +++ b/apps/cic-eth/cic_eth/runnable/ctrl.py @@ -23,7 +23,6 @@ default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic') argparser = argparse.ArgumentParser() argparser.add_argument('-p', '--provider', dest='p', default='http://localhost:8545', type=str, help='Web3 provider url (http only)') -argparser.add_argument('-r', '--registry-address', type=str, help='CIC registry address') argparser.add_argument('-f', '--format', dest='f', default=default_format, type=str, help='Output format') argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index 8b268d08..de6a4f27 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -11,8 +11,14 @@ import websocket # external imports import celery import confini -from chainlib.connection import RPCConnection -from chainlib.eth.connection import EthUnixSignerConnection +from chainlib.connection import ( + RPCConnection, + ConnType, + ) +from chainlib.eth.connection import ( + EthUnixSignerConnection, + EthHTTPSignerConnection, + ) from chainlib.chain import ChainSpec from chainqueue.db.models.otx import Otx from cic_eth_registry.error import UnknownContractError @@ -30,7 +36,6 @@ from cic_eth.eth import ( from cic_eth.admin import ( debug, ctrl, - token, ) from cic_eth.queue import ( query, @@ -143,8 +148,10 @@ else: }) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) +RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') +RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') +RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') -#RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection) RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') @@ -152,7 +159,7 @@ Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') #import cic_eth.checks.gas #if not cic_eth.checks.gas.health(config=config): # raise RuntimeError() -liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config) +liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker') def main(): argv = ['worker'] @@ -195,11 +202,12 @@ def main(): BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) + BaseTask.run_dir = config.get('CIC_RUN_DIR') logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) - liveness.linux.set() + liveness.linux.set(rundir=config.get('CIC_RUN_DIR')) current_app.worker_main(argv) - liveness.linux.reset() + liveness.linux.reset(rundir=config.get('CIC_RUN_DIR')) @celery.signals.eventlet_pool_postshutdown.connect diff --git a/apps/cic-eth/cic_eth/runnable/info.py b/apps/cic-eth/cic_eth/runnable/info.py new file mode 100644 index 00000000..59bfb759 --- /dev/null +++ b/apps/cic-eth/cic_eth/runnable/info.py @@ -0,0 +1,65 @@ +#!python3 + +# SPDX-License-Identifier: GPL-3.0-or-later + +# standard imports +import logging +import argparse +import os + +# external imports +import confini +import celery + +# local imports +from cic_eth.api import Api + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +default_format = 'terminal' +default_config_dir = os.environ.get('CONFINI_DIR', '/usr/local/etc/cic') + + +argparser = argparse.ArgumentParser() +argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') +argparser.add_argument('-c', type=str, default=default_config_dir, help='config root to use') +argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit transaction tasks to') +argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('-v', action='store_true', help='Be verbose') +argparser.add_argument('-vv', help='be more verbose', action='store_true') +args = argparser.parse_args() + +if args.v == True: + logging.getLogger().setLevel(logging.INFO) +elif args.vv == True: + logging.getLogger().setLevel(logging.DEBUG) + +config_dir = os.path.join(args.c) +os.makedirs(config_dir, 0o777, True) +config = confini.Config(config_dir, args.env_prefix) +config.process() +args_override = { + 'CIC_CHAIN_SPEC': getattr(args, 'i'), + } +config.dict_override(args_override, 'cli args') +config.censor('PASSWORD', 'DATABASE') +config.censor('PASSWORD', 'SSL') +logg.debug('config loaded from {}:\n{}'.format(config_dir, config)) + + +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) + +queue = args.q + +api = Api(config.get('CIC_CHAIN_SPEC'), queue=queue) + +def main(): + t = api.default_token() + token_info = t.get() + print('Default token symbol: {}'.format(token_info['symbol'])) + print('Default token address: {}'.format(token_info['address'])) + + +if __name__ == '__main__': + main() diff --git a/apps/cic-eth/cic_eth/runnable/transfer.py b/apps/cic-eth/cic_eth/runnable/transfer.py index 6f408785..c63c72a7 100644 --- a/apps/cic-eth/cic_eth/runnable/transfer.py +++ b/apps/cic-eth/cic_eth/runnable/transfer.py @@ -85,9 +85,6 @@ def main(): callback_queue=args.q, ) - #register = not args.no_register - #logg.debug('register {}'.format(register)) - #t = api.create_account(register=register) t = api.transfer(config.get('_SENDER'), config.get('_RECIPIENT'), config.get('_VALUE'), config.get('_SYMBOL')) ps.get_message() diff --git a/apps/cic-eth/cic_eth/runnable/view.py b/apps/cic-eth/cic_eth/runnable/view.py index 50adce37..ef08e2a4 100644 --- a/apps/cic-eth/cic_eth/runnable/view.py +++ b/apps/cic-eth/cic_eth/runnable/view.py @@ -81,10 +81,14 @@ chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) rpc = EthHTTPConnection(args.p) -registry_address = config.get('CIC_REGISTRY_ADDRESS') +#registry_address = config.get('CIC_REGISTRY_ADDRESS') admin_api = AdminApi(rpc) +t = admin_api.registry() +registry_address = t.get() +logg.info('got registry address from task pool: {}'.format(registry_address)) + trusted_addresses_src = config.get('CIC_TRUST_ADDRESS') if trusted_addresses_src == None: logg.critical('At least one trusted address must be declared in CIC_TRUST_ADDRESS') @@ -151,14 +155,16 @@ def main(): txs = [] renderer = render_tx if len(config.get('_QUERY')) > 66: - registry = connect_registry(rpc, chain_spec, registry_address) - admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer) + #registry = connect_registry(rpc, chain_spec, registry_address) + #admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), registry=registry, renderer=renderer) + admin_api.tx(chain_spec, tx_raw=config.get('_QUERY'), renderer=renderer) elif len(config.get('_QUERY')) > 42: - registry = connect_registry(rpc, chain_spec, registry_address) - admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer) + #registry = connect_registry(rpc, chain_spec, registry_address) + #admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), registry=registry, renderer=renderer) + admin_api.tx(chain_spec, tx_hash=config.get('_QUERY'), renderer=renderer) elif len(config.get('_QUERY')) == 42: - registry = connect_registry(rpc, chain_spec, registry_address) + #registry = connect_registry(rpc, chain_spec, registry_address) txs = admin_api.account(chain_spec, config.get('_QUERY'), include_recipient=False, renderer=render_account) renderer = render_account elif len(config.get('_QUERY')) >= 4 and config.get('_QUERY')[:4] == 'lock': diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index 4c4a7f78..d0d41d47 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -7,18 +7,20 @@ import uuid # external imports import celery import sqlalchemy +from chainlib.chain import ChainSpec +from chainlib.connection import RPCConnection from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.gas import RPCGasOracle +from cic_eth_registry import CICRegistry +from cic_eth_registry.error import UnknownContractError +import liveness.linux # local imports -from cic_eth.error import ( - SignerError, - EthError, - ) +from cic_eth.error import SeppukuError from cic_eth.db.models.base import SessionBase -logg = logging.getLogger(__name__) +logg = logging.getLogger().getChild(__name__) celery_app = celery.current_app @@ -31,6 +33,7 @@ class BaseTask(celery.Task): create_gas_oracle = RPCGasOracle default_token_address = None default_token_symbol = None + run_dir = '/run' def create_session(self): return BaseTask.session_func() @@ -40,6 +43,19 @@ class BaseTask(celery.Task): logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id)) return + + def on_failure(self, exc, task_id, args, kwargs, einfo): + if isinstance(exc, SeppukuError): + liveness.linux.reset(rundir=self.run_dir) + logg.critical(einfo) + msg = 'received critical exception {}, calling shutdown'.format(str(exc)) + s = celery.signature( + 'cic_eth.admin.ctrl.shutdown', + [msg], + queue=self.request.delivery_info.get('routing_key'), + ) + s.apply_async() + class CriticalTask(BaseTask): retry_jitter = True @@ -69,7 +85,6 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask): sqlalchemy.exc.TimeoutError, requests.exceptions.ConnectionError, sqlalchemy.exc.ResourceClosedError, - EthError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_refill_amount = safe_gas_threshold_amount * 5 @@ -80,24 +95,45 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask): sqlalchemy.exc.DatabaseError, sqlalchemy.exc.TimeoutError, sqlalchemy.exc.ResourceClosedError, - SignerError, ) class CriticalWeb3AndSignerTask(CriticalTask): autoretry_for = ( requests.exceptions.ConnectionError, - SignerError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_refill_amount = safe_gas_threshold_amount * 5 -@celery_app.task(bind=True, base=BaseTask) -def hello(self): - time.sleep(0.1) - return id(SessionBase.create_session) +@celery_app.task() +def check_health(self): + pass + + +# TODO: registry / rpc methods should perhaps be moved to better named module +@celery_app.task() +def registry(): + return CICRegistry.address @celery_app.task() -def check_health(self): - celery.app.control.shutdown() +def registry_address_lookup(chain_spec_dict, address, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + registry = CICRegistry(chain_spec, conn) + return registry.by_address(address) + + +@celery_app.task(throws=(UnknownContractError,)) +def registry_name_lookup(chain_spec_dict, name, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + registry = CICRegistry(chain_spec, conn) + return registry.by_name(name) + + +@celery_app.task() +def rpc_proxy(chain_spec_dict, o, connection_tag='default'): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + conn = RPCConnection.connect(chain_spec, tag=connection_tag) + return conn.do(o) diff --git a/apps/cic-eth/cic_eth/version.py b/apps/cic-eth/cic_eth/version.py index 4740b580..a57e9bef 100644 --- a/apps/cic-eth/cic_eth/version.py +++ b/apps/cic-eth/cic_eth/version.py @@ -10,7 +10,7 @@ version = ( 0, 11, 0, - 'beta.8', + 'beta.11', ) version_object = semver.VersionInfo( diff --git a/apps/cic-eth/config/cic.ini b/apps/cic-eth/config/cic.ini index bce15e14..d3840fac 100644 --- a/apps/cic-eth/config/cic.ini +++ b/apps/cic-eth/config/cic.ini @@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996 tx_retry_delay = trust_address = default_token_symbol = GFT -health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas +health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas run_dir = /run diff --git a/apps/cic-eth/config/docker/cic.ini b/apps/cic-eth/config/docker/cic.ini index e4b9e0bd..4fdefca0 100644 --- a/apps/cic-eth/config/docker/cic.ini +++ b/apps/cic-eth/config/docker/cic.ini @@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996 trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C tx_retry_delay = 20 default_token_symbol = GFT -health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas +health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas run_dir = /run diff --git a/apps/cic-eth/config/docker/eth.ini b/apps/cic-eth/config/docker/eth.ini index 9d6b6607..534a2eea 100644 --- a/apps/cic-eth/config/docker/eth.ini +++ b/apps/cic-eth/config/docker/eth.ini @@ -1,4 +1,3 @@ [eth] provider = http://localhost:63545 -health_modules = cic_eth.check.db,cic_eth.check.gas gas_gifter_minimum_balance = 10000000000000000000000 diff --git a/apps/cic-eth/config/eth.ini b/apps/cic-eth/config/eth.ini index 3b41af90..3c589cb2 100644 --- a/apps/cic-eth/config/eth.ini +++ b/apps/cic-eth/config/eth.ini @@ -1,4 +1,3 @@ [eth] provider = http://localhost:8545 gas_gifter_minimum_balance = 10000000000000000000000 -health_modules = cic_eth.check.db,cic_eth.check.gas diff --git a/apps/cic-eth/requirements.txt b/apps/cic-eth/requirements.txt index 44d8b272..c923ada2 100644 --- a/apps/cic-eth/requirements.txt +++ b/apps/cic-eth/requirements.txt @@ -1,4 +1,4 @@ -cic-base==0.1.2b1 +cic-base==0.1.2b3 celery==4.4.7 crypto-dev-signer~=0.4.14b3 confini~=0.3.6rc3 @@ -15,7 +15,7 @@ semver==2.13.0 websocket-client==0.57.0 moolb~=0.1.1b2 eth-address-index~=0.1.1a9 -chainlib~=0.0.2a13 +chainlib~=0.0.2a18 hexathon~=0.0.1a7 chainsyncer[sql]~=0.0.2a2 chainqueue~=0.0.1a7 diff --git a/apps/cic-eth/tests/conftest.py b/apps/cic-eth/tests/conftest.py index 5c4c80d7..eca7cc8c 100644 --- a/apps/cic-eth/tests/conftest.py +++ b/apps/cic-eth/tests/conftest.py @@ -3,8 +3,12 @@ import os import sys import logging +# external imports +from chainlib.eth.erc20 import ERC20 + # local imports from cic_eth.api import Api +from cic_eth.task import BaseTask script_dir = os.path.dirname(os.path.realpath(__file__)) root_dir = os.path.dirname(script_dir) @@ -28,3 +32,26 @@ def api( ): chain_str = str(default_chain_spec) return Api(chain_str, queue=None, callback_param='foo') + + +@pytest.fixture(scope='function') +def foo_token_symbol( + default_chain_spec, + foo_token, + eth_rpc, + contract_roles, + ): + + c = ERC20(default_chain_spec) + o = c.symbol(foo_token, sender_address=contract_roles['CONTRACT_DEPLOYER']) + r = eth_rpc.do(o) + return c.parse_symbol(r) + + +@pytest.fixture(scope='function') +def default_token( + foo_token, + foo_token_symbol, + ): + BaseTask.default_token_symbol = foo_token_symbol + BaseTask.default_token_address = foo_token diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index 33cadc31..26f640d3 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -34,6 +34,7 @@ def celery_includes(): 'cic_eth.admin.ctrl', 'cic_eth.admin.nonce', 'cic_eth.admin.debug', + 'cic_eth.admin.token', 'cic_eth.eth.account', 'cic_eth.callbacks.noop', 'cic_eth.callbacks.http', diff --git a/apps/cic-eth/tests/unit/admin/test_default_token.py b/apps/cic-eth/tests/unit/admin/test_default_token.py new file mode 100644 index 00000000..d49ed3fe --- /dev/null +++ b/apps/cic-eth/tests/unit/admin/test_default_token.py @@ -0,0 +1,21 @@ +# external imports +import celery + + +def test_default_token( + default_token, + celery_session_worker, + foo_token, + foo_token_symbol, + ): + + s = celery.signature( + 'cic_eth.admin.token.default_token', + [], + queue=None, + ) + t = s.apply_async() + r = t.get() + + assert r['address'] == foo_token + assert r['symbol'] == foo_token_symbol diff --git a/apps/cic-ussd/requirements.txt b/apps/cic-ussd/requirements.txt index 36aa04b5..793d5021 100644 --- a/apps/cic-ussd/requirements.txt +++ b/apps/cic-ussd/requirements.txt @@ -1,4 +1,4 @@ -cic_base[full_graph]~=0.1.2a79 -cic-eth~=0.11.0b7 +cic_base[full_graph]~=0.1.2b2 +cic-eth~=0.11.0b9 cic-notify~=0.4.0a4 cic-types~=0.1.0a10 diff --git a/apps/contract-migration/scripts/requirements.txt b/apps/contract-migration/scripts/requirements.txt index 4d02487e..8c055b61 100644 --- a/apps/contract-migration/scripts/requirements.txt +++ b/apps/contract-migration/scripts/requirements.txt @@ -1,5 +1,5 @@ -cic-base[full_graph]==0.1.2a77 +cic-base[full_graph]==0.1.2b2 sarafu-faucet==0.0.2a28 -cic-eth==0.11.0b6 +cic-eth==0.11.0b10 cic-types==0.1.0a10 -crypto-dev-signer==0.4.14b2 +crypto-dev-signer==0.4.14b3 diff --git a/apps/util/liveness/liveness/linux.py b/apps/util/liveness/liveness/linux.py index 7ff754f0..52d4b24c 100644 --- a/apps/util/liveness/liveness/linux.py +++ b/apps/util/liveness/liveness/linux.py @@ -42,6 +42,7 @@ def load(check_strs, namespace=default_namespace, rundir='/run', *args, **kwargs def set(error=0, namespace=default_namespace, rundir='/run'): + logg.info('liveness SET error {} for namespace {}'.format(error, namespace)) app_rundir = os.path.join(rundir, namespace) f = open(os.path.join(app_rundir, 'error'), 'w') f.write(str(error)) @@ -49,6 +50,13 @@ def set(error=0, namespace=default_namespace, rundir='/run'): def reset(namespace=default_namespace, rundir='/run'): + logg.info('liveness RESET for namespace {}'.format(namespace)) app_rundir = os.path.join(rundir, namespace) - os.unlink(os.path.join(app_rundir, 'pid')) - os.unlink(os.path.join(app_rundir, 'error')) + try: + os.unlink(os.path.join(app_rundir, 'pid')) + except FileNotFoundError: + pass + try: + os.unlink(os.path.join(app_rundir, 'error')) + except FileNotFoundError: + pass diff --git a/docker-compose.yml b/docker-compose.yml index 26681d8f..a3abb776 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -240,6 +240,8 @@ services: DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DEBUG: ${DATABASE_DEBUG:-0} DATABASE_POOL_SIZE: 0 + REDIS_PORT: 6379 + REDIS_HOST: redis PGPASSWORD: ${DATABASE_PASSWORD:-tralala} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}