Merge branch 'lash/emergency-shutdown-II' into 'master'

Add remaining health checks and shutdown on critical errors

See merge request grassrootseconomics/cic-internal-integration!115
This commit is contained in:
Louis Holbrook 2021-04-24 17:53:45 +00:00
commit 32b72274f5
13 changed files with 128 additions and 40 deletions

View File

@ -2,7 +2,7 @@
import datetime import datetime
import logging import logging
# third-party imports # external imports
import celery import celery
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -145,3 +145,9 @@ def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
session.flush() session.flush()
session.close() session.close()
return chained_input return chained_input
@celery_app.task()
def shutdown(message):
logg.critical('shutdown called: {}'.format(message))
celery_app.control.shutdown() #broadcast('shutdown')

View File

@ -0,0 +1,18 @@
# external imports
import redis
import os
def health(*args, **kwargs):
r = redis.Redis(
host=kwargs['config'].get('REDIS_HOST'),
port=kwargs['config'].get('REDIS_PORT'),
db=kwargs['config'].get('REDIS_DB'),
)
try:
r.set(kwargs['unit'], os.getpid())
except redis.connection.ConnectionError:
return False
except redis.connection.ResponseError:
return False
return True

View File

@ -48,6 +48,8 @@ class RoleMissingError(Exception):
pass pass
class IntegrityError(Exception): class IntegrityError(Exception):
"""Exception raised to signal irregularities with deduplication and ordering of tasks """Exception raised to signal irregularities with deduplication and ordering of tasks
@ -62,15 +64,19 @@ class LockedError(Exception):
pass pass
class SignerError(Exception): class SeppukuError(Exception):
"""Exception base class for all errors that should cause system shutdown
"""
class SignerError(SeppukuError):
"""Exception raised when signer is unavailable or generates an error """Exception raised when signer is unavailable or generates an error
""" """
pass pass
class EthError(Exception): class RoleAgencyError(SeppukuError):
"""Exception raised when unspecified error from evm node is encountered """Exception raise when a role cannot perform its function. This is a critical exception
""" """
pass

View File

@ -4,10 +4,10 @@ import logging
# external imports # external imports
import celery import celery
from erc20_single_shot_faucet import SingleShotFaucet as Faucet from erc20_single_shot_faucet import SingleShotFaucet as Faucet
from chainlib.eth.constant import ZERO_ADDRESS
from hexathon import ( from hexathon import (
strip_0x, strip_0x,
) )
from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.connection import RPCConnection from chainlib.connection import RPCConnection
from chainlib.eth.sign import ( from chainlib.eth.sign import (
new_account, new_account,
@ -19,6 +19,7 @@ from chainlib.eth.tx import (
unpack, unpack,
) )
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.error import JSONRPCException
from eth_accounts_index import AccountRegistry from eth_accounts_index import AccountRegistry
from sarafu_faucet import MinterFaucet as Faucet from sarafu_faucet import MinterFaucet as Faucet
from chainqueue.db.models.tx import TxCache from chainqueue.db.models.tx import TxCache
@ -70,11 +71,18 @@ def create(self, password, chain_spec_dict):
a = None a = None
conn = RPCConnection.connect(chain_spec, 'signer') conn = RPCConnection.connect(chain_spec, 'signer')
o = new_account() o = new_account()
try:
a = conn.do(o) a = conn.do(o)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
conn.disconnect() conn.disconnect()
# TODO: It seems infeasible that a can be None in any case, verify
if a == None: if a == None:
raise SignerError('create account') raise SignerError('create account')
logg.debug('created account {}'.format(a)) logg.debug('created account {}'.format(a))
# Initialize nonce provider record for account # Initialize nonce provider record for account
@ -219,21 +227,22 @@ def have(self, account, chain_spec_dict):
""" """
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
o = sign_message(account, '0x2a') o = sign_message(account, '0x2a')
try:
conn = RPCConnection.connect(chain_spec, 'signer') conn = RPCConnection.connect(chain_spec, 'signer')
except Exception as e:
logg.debug('cannot sign with {}: {}'.format(account, e))
return None
try: try:
conn.do(o) conn.do(o)
conn.disconnect() except ConnectionError as e:
return account raise SignerError(e)
except Exception as e: except FileNotFoundError as e:
raise SignerError(e)
except JSONRPCException as e:
logg.debug('cannot sign with {}: {}'.format(account, e)) logg.debug('cannot sign with {}: {}'.format(account, e))
conn.disconnect() conn.disconnect()
return None return None
conn.disconnect()
return account
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) @celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict): def set_role(self, tag, address, chain_spec_dict):

View File

@ -108,7 +108,13 @@ def transfer(self, tokens, holder_address, receiver_address, value, chain_spec_d
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.transfer(t['address'], holder_address, receiver_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
rpc_signer.disconnect() rpc_signer.disconnect()
rpc.disconnect() rpc.disconnect()
@ -171,7 +177,12 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic
nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session) nonce_oracle = CustodialTaskNonceOracle(holder_address, self.request.root_id, session=session)
gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas) gas_oracle = self.create_gas_oracle(rpc, MaxGasOracle.gas)
c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle) c = ERC20(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle, nonce_oracle=nonce_oracle)
try:
(tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.approve(t['address'], holder_address, spender_address, value, tx_format=TxFormat.RLP_SIGNED)
except FileNotFoundError as e:
raise SignerError(e)
except ConnectionError as e:
raise SignerError(e)
rpc_signer.disconnect() rpc_signer.disconnect()
rpc.disconnect() rpc.disconnect()

View File

@ -328,7 +328,12 @@ def refill_gas(self, recipient_address, chain_spec_dict):
# build and add transaction # build and add transaction
logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address)) logg.debug('tx send gas amount {} from provider {} to {}'.format(refill_amount, gas_provider, recipient_address))
try:
(tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED) (tx_hash_hex, tx_signed_raw_hex) = c.create(gas_provider, recipient_address, refill_amount, tx_format=TxFormat.RLP_SIGNED)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex)) logg.debug('adding queue refill gas tx {}'.format(tx_hash_hex))
cache_task = 'cic_eth.eth.gas.cache_gas_data' cache_task = 'cic_eth.eth.gas.cache_gas_data'
register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session) register_tx(tx_hash_hex, tx_signed_raw_hex, chain_spec, queue, cache_task=cache_task, session=session)
@ -404,7 +409,12 @@ def resend_with_higher_gas(self, txold_hash_hex, chain_spec_dict, gas=None, defa
c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle) c = TxFactory(chain_spec, signer=rpc_signer, gas_oracle=gas_oracle)
logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx)) logg.debug('change gas price from old {} to new {} for tx {}'.format(tx['gasPrice'], new_gas_price, tx))
tx['gasPrice'] = new_gas_price tx['gasPrice'] = new_gas_price
try:
(tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx) (tx_hash_hex, tx_signed_raw_hex) = c.build_raw(tx)
except ConnectionError as e:
raise SignerError(e)
except FileNotFoundError as e:
raise SignerError(e)
queue_create( queue_create(
chain_spec, chain_spec,
tx['nonce'], tx['nonce'],

View File

@ -11,8 +11,14 @@ import websocket
# external imports # external imports
import celery import celery
import confini import confini
from chainlib.connection import RPCConnection from chainlib.connection import (
from chainlib.eth.connection import EthUnixSignerConnection RPCConnection,
ConnType,
)
from chainlib.eth.connection import (
EthUnixSignerConnection,
EthHTTPSignerConnection,
)
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
from cic_eth_registry.error import UnknownContractError from cic_eth_registry.error import UnknownContractError
@ -143,8 +149,10 @@ else:
}) })
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
RPCConnection.register_constructor(ConnType.UNIX, EthUnixSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP, EthHTTPSignerConnection, 'signer')
RPCConnection.register_constructor(ConnType.HTTP_SSL, EthHTTPSignerConnection, 'signer')
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default') RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
#RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection)
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer') RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer')
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
@ -152,7 +160,7 @@ Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
#import cic_eth.checks.gas #import cic_eth.checks.gas
#if not cic_eth.checks.gas.health(config=config): #if not cic_eth.checks.gas.health(config=config):
# raise RuntimeError() # raise RuntimeError()
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config) liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config, unit='cic-eth-tasker')
def main(): def main():
argv = ['worker'] argv = ['worker']
@ -195,11 +203,12 @@ def main():
BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL') BaseTask.default_token_symbol = config.get('CIC_DEFAULT_TOKEN_SYMBOL')
BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol) BaseTask.default_token_address = registry.by_name(BaseTask.default_token_symbol)
BaseTask.run_dir = config.get('CIC_RUN_DIR')
logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address)) logg.info('default token set to {} {}'.format(BaseTask.default_token_symbol, BaseTask.default_token_address))
liveness.linux.set() liveness.linux.set(rundir=config.get('CIC_RUN_DIR'))
current_app.worker_main(argv) current_app.worker_main(argv)
liveness.linux.reset() liveness.linux.reset(rundir=config.get('CIC_RUN_DIR'))
@celery.signals.eventlet_pool_postshutdown.connect @celery.signals.eventlet_pool_postshutdown.connect

View File

@ -10,15 +10,13 @@ import sqlalchemy
from chainlib.eth.constant import ZERO_ADDRESS from chainlib.eth.constant import ZERO_ADDRESS
from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.gas import RPCGasOracle from chainlib.eth.gas import RPCGasOracle
import liveness.linux
# local imports # local imports
from cic_eth.error import ( from cic_eth.error import SeppukuError
SignerError,
EthError,
)
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
logg = logging.getLogger(__name__) logg = logging.getLogger().getChild(__name__)
celery_app = celery.current_app celery_app = celery.current_app
@ -31,6 +29,7 @@ class BaseTask(celery.Task):
create_gas_oracle = RPCGasOracle create_gas_oracle = RPCGasOracle
default_token_address = None default_token_address = None
default_token_symbol = None default_token_symbol = None
run_dir = '/run'
def create_session(self): def create_session(self):
return BaseTask.session_func() return BaseTask.session_func()
@ -41,6 +40,19 @@ class BaseTask(celery.Task):
return return
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, SeppukuError):
liveness.linux.reset(rundir=self.run_dir)
logg.critical(einfo)
msg = 'received critical exception {}, calling shutdown'.format(str(exc))
s = celery.signature(
'cic_eth.admin.ctrl.shutdown',
[msg],
queue=self.request.delivery_info.get('routing_key'),
)
s.apply_async()
class CriticalTask(BaseTask): class CriticalTask(BaseTask):
retry_jitter = True retry_jitter = True
retry_backoff = True retry_backoff = True
@ -69,7 +81,6 @@ class CriticalSQLAlchemyAndWeb3Task(CriticalTask):
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
EthError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5 safe_gas_refill_amount = safe_gas_threshold_amount * 5
@ -80,13 +91,11 @@ class CriticalSQLAlchemyAndSignerTask(CriticalTask):
sqlalchemy.exc.DatabaseError, sqlalchemy.exc.DatabaseError,
sqlalchemy.exc.TimeoutError, sqlalchemy.exc.TimeoutError,
sqlalchemy.exc.ResourceClosedError, sqlalchemy.exc.ResourceClosedError,
SignerError,
) )
class CriticalWeb3AndSignerTask(CriticalTask): class CriticalWeb3AndSignerTask(CriticalTask):
autoretry_for = ( autoretry_for = (
requests.exceptions.ConnectionError, requests.exceptions.ConnectionError,
SignerError,
) )
safe_gas_threshold_amount = 2000000000 * 60000 * 3 safe_gas_threshold_amount = 2000000000 * 60000 * 3
safe_gas_refill_amount = safe_gas_threshold_amount * 5 safe_gas_refill_amount = safe_gas_threshold_amount * 5
@ -100,4 +109,4 @@ def hello(self):
@celery_app.task() @celery_app.task()
def check_health(self): def check_health(self):
celery.app.control.shutdown() pass

View File

@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996
tx_retry_delay = tx_retry_delay =
trust_address = trust_address =
default_token_symbol = GFT default_token_symbol = GFT
health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas
run_dir = /run run_dir = /run

View File

@ -4,5 +4,5 @@ chain_spec = evm:bloxberg:8996
trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C trust_address = 0xEb3907eCad74a0013c259D5874AE7f22DcBcC95C
tx_retry_delay = 20 tx_retry_delay = 20
default_token_symbol = GFT default_token_symbol = GFT
health_modules = cic_eth.check.db,cic_eth.check.signer,cic_eth.check.gas health_modules = cic_eth.check.db,cic_eth.check.redis,cic_eth.check.signer,cic_eth.check.gas
run_dir = /run run_dir = /run

View File

@ -1,4 +1,4 @@
cic-base==0.1.2b1 cic-base==0.1.2b2
celery==4.4.7 celery==4.4.7
crypto-dev-signer~=0.4.14b3 crypto-dev-signer~=0.4.14b3
confini~=0.3.6rc3 confini~=0.3.6rc3

View File

@ -42,6 +42,7 @@ def load(check_strs, namespace=default_namespace, rundir='/run', *args, **kwargs
def set(error=0, namespace=default_namespace, rundir='/run'): def set(error=0, namespace=default_namespace, rundir='/run'):
logg.info('liveness SET error {} for namespace {}'.format(error, namespace))
app_rundir = os.path.join(rundir, namespace) app_rundir = os.path.join(rundir, namespace)
f = open(os.path.join(app_rundir, 'error'), 'w') f = open(os.path.join(app_rundir, 'error'), 'w')
f.write(str(error)) f.write(str(error))
@ -49,6 +50,13 @@ def set(error=0, namespace=default_namespace, rundir='/run'):
def reset(namespace=default_namespace, rundir='/run'): def reset(namespace=default_namespace, rundir='/run'):
logg.info('liveness RESET for namespace {}'.format(namespace))
app_rundir = os.path.join(rundir, namespace) app_rundir = os.path.join(rundir, namespace)
try:
os.unlink(os.path.join(app_rundir, 'pid')) os.unlink(os.path.join(app_rundir, 'pid'))
except FileNotFoundError:
pass
try:
os.unlink(os.path.join(app_rundir, 'error')) os.unlink(os.path.join(app_rundir, 'error'))
except FileNotFoundError:
pass

View File

@ -240,6 +240,8 @@ services:
DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2} DATABASE_DRIVER: ${DATABASE_DRIVER:-psycopg2}
DATABASE_DEBUG: ${DATABASE_DEBUG:-0} DATABASE_DEBUG: ${DATABASE_DEBUG:-0}
DATABASE_POOL_SIZE: 0 DATABASE_POOL_SIZE: 0
REDIS_PORT: 6379
REDIS_HOST: redis
PGPASSWORD: ${DATABASE_PASSWORD:-tralala} PGPASSWORD: ${DATABASE_PASSWORD:-tralala}
CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996} CIC_CHAIN_SPEC: ${CIC_CHAIN_SPEC:-evm:bloxberg:8996}
BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor} BANCOR_DIR: ${BANCOR_DIR:-/usr/local/share/cic/bancor}