diff --git a/apps/cic-eth/cic_eth/admin/ctrl.py b/apps/cic-eth/cic_eth/admin/ctrl.py index 0b55ffc5..af6dd647 100644 --- a/apps/cic-eth/cic_eth/admin/ctrl.py +++ b/apps/cic-eth/cic_eth/admin/ctrl.py @@ -2,7 +2,7 @@ import datetime import logging -# third-party imports +# external imports import celery from chainlib.eth.constant import ZERO_ADDRESS from chainlib.chain import ChainSpec @@ -145,3 +145,9 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None): session.flush() session.close() return chained_input + + +@celery_app.task() +def shutdown(message): + logg.critical('shutdown called: {}'.format(message)) + celery_app.control.shutdown() #broadcast('shutdown') diff --git a/apps/cic-eth/cic_eth/check/redis.py b/apps/cic-eth/cic_eth/check/redis.py new file mode 100644 index 00000000..7a147fe9 --- /dev/null +++ b/apps/cic-eth/cic_eth/check/redis.py @@ -0,0 +1,18 @@ +# external imports +import redis +import os + + +def health(*args, **kwargs): + r = redis.Redis( + host=kwargs['config'].get('REDIS_HOST'), + port=kwargs['config'].get('REDIS_PORT'), + db=kwargs['config'].get('REDIS_DB'), + ) + try: + r.set(kwargs['unit'], os.getpid()) + except redis.connection.ConnectionError: + return False + except redis.connection.ResponseError: + return False + return True diff --git a/apps/cic-eth/cic_eth/error.py b/apps/cic-eth/cic_eth/error.py index 17053158..781fed85 100644 --- a/apps/cic-eth/cic_eth/error.py +++ b/apps/cic-eth/cic_eth/error.py @@ -48,6 +48,8 @@ class RoleMissingError(Exception): pass + + class IntegrityError(Exception): """Exception raised to signal irregularities with deduplication and ordering of tasks @@ -62,15 +64,19 @@ class LockedError(Exception): pass -class SignerError(Exception): +class SeppukuError(Exception): + """Exception base class for all errors that should cause system shutdown + + """ + + +class SignerError(SeppukuError): """Exception raised when signer is unavailable or generates an error """ pass -class EthError(Exception): - """Exception raised when unspecified error from evm node is encountered - +class RoleAgencyError(SeppukuError): + """Exception raise when a role cannot perform its function. This is a critical exception """ - pass diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index e35760ce..46b0060c 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -4,10 +4,10 @@ import logging # external imports import celery from erc20_single_shot_faucet import SingleShotFaucet as Faucet -from chainlib.eth.constant import ZERO_ADDRESS from hexathon import ( strip_0x, ) +from chainlib.eth.constant import ZERO_ADDRESS from chainlib.connection import RPCConnection from chainlib.eth.sign import ( new_account, @@ -19,6 +19,7 @@ from chainlib.eth.tx import ( unpack, ) from chainlib.chain import ChainSpec +from chainlib.error import JSONRPCException from eth_accounts_index import AccountRegistry from sarafu_faucet import MinterFaucet as Faucet from chainqueue.db.models.tx import TxCache @@ -70,11 +71,18 @@ def create(self, password, chain_spec_dict): a = None conn = RPCConnection.connect(chain_spec, 'signer') o = new_account() - a = conn.do(o) + try: + a = conn.do(o) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) conn.disconnect() + # TODO: It seems infeasible that a can be None in any case, verify if a == None: raise SignerError('create account') + logg.debug('created account {}'.format(a)) # Initialize nonce provider record for account @@ -219,21 +227,22 @@ def have(self, account, chain_spec_dict): """ chain_spec = ChainSpec.from_dict(chain_spec_dict) o = sign_message(account, '0x2a') - try: - conn = RPCConnection.connect(chain_spec, 'signer') - except Exception as e: - logg.debug('cannot sign with {}: {}'.format(account, e)) - return None + conn = RPCConnection.connect(chain_spec, 'signer') try: conn.do(o) - conn.disconnect() - return account - except Exception as e: + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) + except JSONRPCException as e: logg.debug('cannot sign with {}: {}'.format(account, e)) conn.disconnect() return None + conn.disconnect() + return account + @celery_app.task(bind=True, base=CriticalSQLAlchemyTask) def set_role(self, tag, address, chain_spec_dict): diff --git a/apps/cic-eth/cic_eth/eth/erc20.py b/apps/cic-eth/cic_eth/eth/erc20.py index 5bde32b9..71dfd6a2 100644 --- a/apps/cic-eth/cic_eth/eth/erc20.py +++ b/apps/cic-eth/cic_eth/eth/erc20.py @@ -108,7 +108,13 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) - (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) + except FileNotFoundError as e: + raise SignerError(e) + except ConnectionError as e: + raise SignerError(e) + rpc_signer.disconnect() rpc.disconnect() @@ -171,7 +177,12 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) - (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) + except FileNotFoundError as e: + raise SignerError(e) + except ConnectionError as e: + raise SignerError(e) rpc_signer.disconnect() rpc.disconnect() diff --git a/apps/cic-eth/cic_eth/eth/gas.py b/apps/cic-eth/cic_eth/eth/gas.py index f7b71eb9..258054c0 100644 --- a/apps/cic-eth/cic_eth/eth/gas.py +++ b/apps/cic-eth/cic_eth/eth/gas.py @@ -328,7 +328,12 @@ def refill_gas(self, recipient_address, chain_spec_dict): # build and add transaction logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address)) - (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex)) cache_task = 'cic_eth.eth.gas.cache_gas_data' register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session) @@ -404,7 +409,12 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, defa c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle) logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) tx['gasPrice'] = new_gas_price - (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) + try: + (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) + except ConnectionError as e: + raise SignerError(e) + except FileNotFoundError as e: + raise SignerError(e) queue_create( chain_spec, tx['nonce'], diff --git a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py index a3e7d467..d6402ea6 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/tasker.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/tasker.py @@ -16,9 +16,9 @@ from chainlib.connection import ( ConnType, ) from chainlib.eth.connection import ( - EthUnixSignerConnection, - EthHTTPSignerConnection, - ) + EthUnixSignerConnection, + EthHTTPSignerConnection, + ) from chainlib.chain import ChainSpec from chainqueue.db.models.otx import Otx from cic_eth_registry.error import UnknownContractError @@ -149,11 +149,17 @@ else: }) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) +RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer') +RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer') +RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') +<<<<<<< HEAD #RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection) RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, tag='signer') RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, tag='signer') RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, tag='signer') +======= +>>>>>>> origin/master RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') @@ -161,7 +167,7 @@ Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') #import cic_eth.checks.gas #if not cic_eth.checks.gas.health(config=config): # raise RuntimeError() -liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config) +liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker') def main(): argv = ['worker'] @@ -204,11 +210,12 @@ def main(): BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) + BaseTask.run_dir = config.get('CIC_RUN_DIR') logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) - liveness.linux.set() + liveness.linux.set(rundir=config.get('CIC_RUN_DIR')) current_app.worker_main(argv) - liveness.linux.reset() + liveness.linux.reset(rundir=config.get('CIC_RUN_DIR')) @celery.signals.eventlet_pool_postshutdown.connect diff --git a/apps/cic-eth/cic_eth/task.py b/apps/cic-eth/cic_eth/task.py index 4c4a7f78..3fe201f6 100644 --- a/apps/cic-eth/cic_eth/task.py +++ b/apps/cic-eth/cic_eth/task.py @@ -10,15 +10,13 @@ import sqlalchemy from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.gas import RPCGasOracle +import liveness.linux # local imports -from cic_eth.error import ( - SignerError, - EthError, - ) +from cic_eth.error import SeppukuError from cic_eth.db.models.base import SessionBase -logg = logging.getLogger(__name__) +logg = logging.getLogger().getChild(__name__) celery_app = celery.current_app @@ -31,6 +29,7 @@ class BaseTask(celery.Task): create_gas_oracle = RPCGasOracle default_token_address = None default_token_symbol = None + run_dir = '/run' def create_session(self): return BaseTask.session_func() @@ -40,6 +39,19 @@ class BaseTask(celery.Task): logg.debug('task {} root uuid {}'.format(self.__class__.__name__, self.request.root_id)) return + + def on_failure(self, exc, task_id, args, kwargs, einfo): + if isinstance(exc, SeppukuError): + liveness.linux.reset(rundir=self.run_dir) + logg.critical(einfo) + msg = 'received critical exception {}, calling shutdown'.format(str(exc)) + s = celery.signature( + 'cic_eth.admin.ctrl.shutdown', + [msg], + queue=self.request.delivery_info.get('routing_key'), + ) + s.apply_async() + class CriticalTask(BaseTask): retry_jitter = True @@ -69,7 +81,6 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask): sqlalchemy.exc.TimeoutError, requests.exceptions.ConnectionError, sqlalchemy.exc.ResourceClosedError, - EthError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_refill_amount = safe_gas_threshold_amount * 5 @@ -80,13 +91,11 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask): sqlalchemy.exc.DatabaseError, sqlalchemy.exc.TimeoutError, sqlalchemy.exc.ResourceClosedError, - SignerError, ) class CriticalWeb3AndSignerTask(CriticalTask): autoretry_for = ( requests.exceptions.ConnectionError, - SignerError, ) safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_refill_amount = safe_gas_threshold_amount * 5 @@ -100,4 +109,4 @@ def hello(self): @celery_app.task() def check_health(self): - celery.app.control.shutdown() + pass diff --git a/apps/cic-eth/config/cic.ini b/apps/cic-eth/config/cic.ini index bce15e14..d3840fac 100644 --- a/apps/cic-eth/config/cic.ini +++ b/apps/cic-eth/config/cic.ini @@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996 tx_retry_delay = trust_address = default_token_symbol = GFT -health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas +health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas run_dir = /run diff --git a/apps/cic-eth/config/docker/cic.ini b/apps/cic-eth/config/docker/cic.ini index e4b9e0bd..4fdefca0 100644 --- a/apps/cic-eth/config/docker/cic.ini +++ b/apps/cic-eth/config/docker/cic.ini @@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996 trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C tx_retry_delay = 20 default_token_symbol = GFT -health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas +health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas run_dir = /run diff --git a/apps/util/liveness/liveness/linux.py b/apps/util/liveness/liveness/linux.py index 7ff754f0..52d4b24c 100644 --- a/apps/util/liveness/liveness/linux.py +++ b/apps/util/liveness/liveness/linux.py @@ -42,6 +42,7 @@ def load(check_strs, namespace=default_namespace, rundir='/run', *args, **kwargs def set(error=0, namespace=default_namespace, rundir='/run'): + logg.info('liveness SET error {} for namespace {}'.format(error, namespace)) app_rundir = os.path.join(rundir, namespace) f = open(os.path.join(app_rundir, 'error'), 'w') f.write(str(error)) @@ -49,6 +50,13 @@ def set(error=0, namespace=default_namespace, rundir='/run'): def reset(namespace=default_namespace, rundir='/run'): + logg.info('liveness RESET for namespace {}'.format(namespace)) app_rundir = os.path.join(rundir, namespace) - os.unlink(os.path.join(app_rundir, 'pid')) - os.unlink(os.path.join(app_rundir, 'error')) + try: + os.unlink(os.path.join(app_rundir, 'pid')) + except FileNotFoundError: + pass + try: + os.unlink(os.path.join(app_rundir, 'error')) + except FileNotFoundError: + pass diff --git a/docker-compose.yml b/docker-compose.yml index 26681d8f..a3abb776 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -240,6 +240,8 @@ services: DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DEBUG: ${DATABASE_DEBUG:-0} DATABASE_POOL_SIZE: 0 + REDIS_PORT: 6379 + REDIS_HOST: redis PGPASSWORD: ${DATABASE_PASSWORD:-tralala} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}