Add health check for signer, redis, shutdown on missing signer

This commit is contained in:
nolash 2021-04-24 08:03:49 +02:00
parent 3ed263a879
commit 1110e79e5e
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
9 changed files with 64 additions and 25 deletions

View File

@ -2,7 +2,7 @@
import datetime import datetime
import logging import logging
# third-party imports # external imports
import celery import celery
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -145,3 +145,9 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
session.flush() session.flush()
session.close() session.close()
return chained_input return chained_input
@celery_app.task()
def shutdown(message):
logg.critical('shutdown called: {}'.format(message))
celery_app.control.shutdown() #broadcast('shutdown')

View File

@ -48,6 +48,8 @@ class RoleMissingError(Exception):
pass pass
class IntegrityError(Exception): class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks """Exception raised to signal irregularities with deduplication and ordering of tasks
@ -62,15 +64,19 @@ class LockedError(Exception):
pass pass
class SignerError(Exception): class SeppukuError(Exception):
"""Exception base class for all errors that should cause system shutdown
"""
class SignerError(SeppukuError):
"""Exception raised when signer is unavailable or generates an error """Exception raised when signer is unavailable or generates an error
""" """
pass pass
class EthError(Exception): class RoleAgencyError(SeppukuError):
"""Exception raised when unspecified error from evm node is encountered """Exception raise when a role cannot perform its function. This is a critical exception
""" """
pass

View File

@ -70,11 +70,18 @@ def create(self, password, chain_spec_dict):
a = None a = None
conn = RPCConnection.connect(chain_spec, 'signer') conn = RPCConnection.connect(chain_spec, 'signer')
o = new_account() o = new_account()
try:
a = conn.do(o) a = conn.do(o)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
conn.disconnect() conn.disconnect()
# TODO: It seems infeasible that a can be None in any case, verify
if a == None: if a == None:
raise SignerError('create account') raise SignerError('create account')
logg.debug('created account {}'.format(a)) logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account # Initialize nonce provider record for account

View File

@ -108,7 +108,13 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
rpc_signer.disconnect() rpc_signer.disconnect()
rpc.disconnect() rpc.disconnect()

View File

@ -11,8 +11,14 @@ import websocket
# external imports # external imports
import celery import celery
import confini import confini
from chainlib.connection import RPCConnection from chainlib.connection import (
from chainlib.eth.connection import EthUnixSignerConnection RPCConnection,
ConnType,
)
from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
@ -143,8 +149,10 @@ else:
}) })
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer')
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
#RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection)
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer')
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
@ -152,7 +160,7 @@ Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
#import cic_eth.checks.gas #import cic_eth.checks.gas
#if not cic_eth.checks.gas.health(config=config): #if not cic_eth.checks.gas.health(config=config):
# raise RuntimeError() # raise RuntimeError()
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config) liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker')
def main(): def main():
argv = ['worker'] argv = ['worker']
@ -198,9 +206,9 @@ def main():
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))
liveness.linux.set() liveness.linux.set(rundir=config.get('CIC_RUN_DIR'))
current_app.worker_main(argv) current_app.worker_main(argv)
liveness.linux.reset() liveness.linux.reset(rundir=config.get('CIC_RUN_DIR'))
@celery.signals.eventlet_pool_postshutdown.connect @celery.signals.eventlet_pool_postshutdown.connect

View File

@ -12,10 +12,7 @@ from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import RPCGasOracle from chainlib.eth.gas import RPCGasOracle
# local imports # local imports
from cic_eth.error import ( from cic_eth.error import SeppukuError
SignerError,
EthError,
)
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -41,6 +38,18 @@ class BaseTask(celery.Task):
return return
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, SeppukuError):
logg.critical(einfo)
msg = 'received critical exception {}, calling shutdown'.format(str(exc))
s = celery.signature(
'cic_eth.admin.ctrl.shutdown',
[msg],
queue=self.request.delivery_info.get('routing_key'),
)
s.apply_async()
class CriticalTask(BaseTask): class CriticalTask(BaseTask):
retry_jitter = True retry_jitter = True
retry_backoff = True retry_backoff = True
@ -69,7 +78,6 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
EthError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5 safe_gas_refill_amount = safe_gas_threshold_amount * 5
@ -80,13 +88,11 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
SignerError,
) )
class CriticalWeb3AndSignerTask(CriticalTask): class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = ( autoretry_for = (
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
SignerError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5 safe_gas_refill_amount = safe_gas_threshold_amount * 5
@ -100,4 +106,4 @@ def hello(self):
@celery_app.task() @celery_app.task()
def check_health(self): def check_health(self):
celery.app.control.shutdown() pass

View File

@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996
tx_retry_delay = tx_retry_delay =
trust_address = trust_address =
default_token_symbol = GFT default_token_symbol = GFT
health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas
run_dir = /run run_dir = /run

View File

@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
tx_retry_delay = 20 tx_retry_delay = 20
default_token_symbol = GFT default_token_symbol = GFT
health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas
run_dir = /run run_dir = /run

View File

@ -1,4 +1,4 @@
cic-base==0.1.2b1 cic-base==0.1.2b2
celery==4.4.7 celery==4.4.7
crypto-dev-signer~=0.4.14b3 crypto-dev-signer~=0.4.14b3
confini~=0.3.6rc3 confini~=0.3.6rc3