cic-eth: Add sanity checks for emergency shutdown / liveness tests
This commit is contained in:
@@ -32,7 +32,9 @@ def lock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.AL
|
||||
:returns: New lock state for address
|
||||
:rtype: number
|
||||
"""
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
chain_str = '::'
|
||||
if chain_spec_dict != None:
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
r = Lock.set(chain_str, flags, address=address, tx_hash=tx_hash)
|
||||
logg.debug('Locked {} for {}, flag now {}'.format(flags, address, r))
|
||||
return chained_input
|
||||
@@ -51,7 +53,9 @@ def unlock(chained_input, chain_spec_dict, address=ZERO_ADDRESS, flags=LockEnum.
|
||||
:returns: New lock state for address
|
||||
:rtype: number
|
||||
"""
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
chain_str = '::'
|
||||
if chain_spec_dict != None:
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
r = Lock.reset(chain_str, flags, address=address)
|
||||
logg.debug('Unlocked {} for {}, flag now {}'.format(flags, address, r))
|
||||
return chained_input
|
||||
@@ -127,7 +131,9 @@ def unlock_queue(chained_input, chain_spec_dict, address=ZERO_ADDRESS):
|
||||
|
||||
@celery_app.task(base=CriticalSQLAlchemyTask)
|
||||
def check_lock(chained_input, chain_spec_dict, lock_flags, address=None):
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
chain_str = '::'
|
||||
if chain_spec_dict != None:
|
||||
chain_str = str(ChainSpec.from_dict(chain_spec_dict))
|
||||
session = SessionBase.create_session()
|
||||
r = Lock.check(chain_str, lock_flags, address=ZERO_ADDRESS, session=session)
|
||||
if address != None:
|
||||
|
||||
0
apps/cic-eth/cic_eth/check/__init__.py
Normal file
0
apps/cic-eth/cic_eth/check/__init__.py
Normal file
@@ -1,5 +1,6 @@
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
|
||||
|
||||
def health(*args, **kwargs):
|
||||
session = SessionBase.create_session()
|
||||
session.execute('SELECT count(*) from alembic_version')
|
||||
48
apps/cic-eth/cic_eth/check/gas.py
Normal file
48
apps/cic-eth/cic_eth/check/gas.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# standard imports
|
||||
import logging
|
||||
|
||||
# external imports
|
||||
from chainlib.connection import RPCConnection
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainlib.eth.gas import balance
|
||||
|
||||
# local imports
|
||||
from cic_eth.db.models.role import AccountRole
|
||||
from cic_eth.db.models.base import SessionBase
|
||||
from cic_eth.db.enum import LockEnum
|
||||
from cic_eth.error import LockedError
|
||||
from cic_eth.admin.ctrl import check_lock
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
def health(*args, **kwargs):
|
||||
|
||||
session = SessionBase.create_session()
|
||||
|
||||
config = kwargs['config']
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
logg.debug('check gas balance of gas gifter for chain {}'.format(chain_spec))
|
||||
|
||||
try:
|
||||
check_lock(None, None, LockEnum.INIT)
|
||||
except LockedError:
|
||||
logg.warning('INIT lock is set, skipping GAS GIFTER balance check.')
|
||||
return True
|
||||
|
||||
gas_provider = AccountRole.get_address('GAS_GIFTER', session=session)
|
||||
session.close()
|
||||
|
||||
rpc = RPCConnection.connect(chain_spec, 'default')
|
||||
o = balance(gas_provider)
|
||||
r = rpc.do(o)
|
||||
try:
|
||||
r = int(r, 16)
|
||||
except TypeError:
|
||||
r = int(r)
|
||||
gas_min = int(config.get('ETH_GAS_GIFTER_MINIMUM_BALANCE'))
|
||||
if r < gas_min:
|
||||
logg.error('EEK! gas gifter has balance {}, below minimum {}'.format(r, gas_min))
|
||||
return False
|
||||
|
||||
return True
|
||||
37
apps/cic-eth/cic_eth/check/signer.py
Normal file
37
apps/cic-eth/cic_eth/check/signer.py
Normal file
@@ -0,0 +1,37 @@
|
||||
# standard imports
|
||||
import time
|
||||
import logging
|
||||
from urllib.error import URLError
|
||||
|
||||
# external imports
|
||||
from chainlib.connection import RPCConnection
|
||||
from chainlib.eth.constant import ZERO_ADDRESS
|
||||
from chainlib.eth.sign import sign_message
|
||||
from chainlib.error import JSONRPCException
|
||||
|
||||
logg = logging.getLogger().getChild(__name__)
|
||||
|
||||
|
||||
def health(*args, **kwargs):
|
||||
blocked = True
|
||||
max_attempts = 5
|
||||
conn = RPCConnection.connect(kwargs['config'].get('CIC_CHAIN_SPEC'), tag='signer')
|
||||
for i in range(max_attempts):
|
||||
idx = i + 1
|
||||
logg.debug('attempt signer connection check {}/{}'.format(idx, max_attempts))
|
||||
try:
|
||||
conn.do(sign_message(ZERO_ADDRESS, '0x2a'))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except ConnectionError:
|
||||
pass
|
||||
except URLError:
|
||||
pass
|
||||
except JSONRPCException:
|
||||
logg.debug('signer connection succeeded')
|
||||
return True
|
||||
|
||||
if idx < max_attempts:
|
||||
time.sleep(0.5)
|
||||
|
||||
return False
|
||||
@@ -74,10 +74,11 @@ class LockEnum(enum.IntEnum):
|
||||
QUEUE: Disable queueing new or modified transactions
|
||||
"""
|
||||
STICKY=1
|
||||
CREATE=2
|
||||
SEND=4
|
||||
QUEUE=8
|
||||
QUERY=16
|
||||
INIT=2
|
||||
CREATE=4
|
||||
SEND=8
|
||||
QUEUE=16
|
||||
QUERY=32
|
||||
ALL=int(0xfffffffffffffffe)
|
||||
|
||||
|
||||
|
||||
@@ -5,8 +5,11 @@ Revises: 1f1b3b641d08
|
||||
Create Date: 2021-04-02 18:41:20.864265
|
||||
|
||||
"""
|
||||
import datetime
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from chainlib.eth.constant import ZERO_ADDRESS
|
||||
from cic_eth.db.enum import LockEnum
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
@@ -23,10 +26,11 @@ def upgrade():
|
||||
sa.Column("address", sa.String(42), nullable=True),
|
||||
sa.Column('blockchain', sa.String),
|
||||
sa.Column("flags", sa.BIGINT(), nullable=False, default=0),
|
||||
sa.Column("date_created", sa.DateTime, nullable=False),
|
||||
sa.Column("date_created", sa.DateTime, nullable=False, default=datetime.datetime.utcnow),
|
||||
sa.Column("otx_id", sa.Integer, sa.ForeignKey('otx.id'), nullable=True),
|
||||
)
|
||||
op.create_index('idx_chain_address', 'lock', ['blockchain', 'address'], unique=True)
|
||||
op.execute("INSERT INTO lock (address, date_created, blockchain, flags) VALUES('{}', '{}', '::', {})".format(ZERO_ADDRESS, datetime.datetime.utcnow(), LockEnum.INIT | LockEnum.SEND | LockEnum.QUEUE))
|
||||
|
||||
|
||||
def downgrade():
|
||||
|
||||
@@ -59,6 +59,7 @@ args_override = {
|
||||
'CIC_CHAIN_SPEC': getattr(args, 'i'),
|
||||
}
|
||||
# override args
|
||||
config.dict_override(args_override, 'cli')
|
||||
config.censor('PASSWORD', 'DATABASE')
|
||||
config.censor('PASSWORD', 'SSL')
|
||||
logg.debug('config loaded from {}:\n{}'.format(config_dir, config))
|
||||
@@ -67,7 +68,9 @@ celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=confi
|
||||
|
||||
queue = args.q
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
chain_spec = None
|
||||
if config.get('CIC_CHAIN_SPEC') != None and config.get('CIC_CHAIN_SPEC') != '::':
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
admin_api = AdminApi(None)
|
||||
|
||||
|
||||
@@ -82,6 +85,9 @@ def lock_names_to_flag(s):
|
||||
|
||||
# TODO: move each command to submodule
|
||||
def main():
|
||||
chain_spec_dict = None
|
||||
if chain_spec != None:
|
||||
chain_spec_dict = chain_spec.asdict()
|
||||
if args.command == 'unlock':
|
||||
flags = lock_names_to_flag(args.flags)
|
||||
if not is_checksum_address(args.address):
|
||||
@@ -91,7 +97,7 @@ def main():
|
||||
'cic_eth.admin.ctrl.unlock',
|
||||
[
|
||||
None,
|
||||
chain_spec.asdict(),
|
||||
chain_spec_dict,
|
||||
args.address,
|
||||
flags,
|
||||
],
|
||||
@@ -110,7 +116,7 @@ def main():
|
||||
'cic_eth.admin.ctrl.lock',
|
||||
[
|
||||
None,
|
||||
chain_spec.asdict(),
|
||||
chain_spec_dict,
|
||||
args.address,
|
||||
flags,
|
||||
],
|
||||
|
||||
@@ -15,8 +15,10 @@ from chainlib.connection import RPCConnection
|
||||
from chainlib.eth.connection import EthUnixSignerConnection
|
||||
from chainlib.chain import ChainSpec
|
||||
from chainqueue.db.models.otx import Otx
|
||||
from cic_eth_registry.error import UnknownContractError
|
||||
import liveness.linux
|
||||
|
||||
|
||||
# local imports
|
||||
from cic_eth.eth import (
|
||||
erc20,
|
||||
@@ -138,11 +140,15 @@ else:
|
||||
|
||||
chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC'))
|
||||
RPCConnection.register_location(config.get('ETH_PROVIDER'), chain_spec, 'default')
|
||||
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection)
|
||||
#RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer', constructor=EthUnixSignerConnection)
|
||||
RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 'signer')
|
||||
|
||||
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
|
||||
|
||||
liveness.linux.load(health_modules)
|
||||
#import cic_eth.checks.gas
|
||||
#if not cic_eth.checks.gas.health(config=config):
|
||||
# raise RuntimeError()
|
||||
liveness.linux.load(health_modules, rundir=config.get('CIC_RUN_DIR'), config=config)
|
||||
|
||||
def main():
|
||||
argv = ['worker']
|
||||
@@ -166,7 +172,11 @@ def main():
|
||||
|
||||
rpc = RPCConnection.connect(chain_spec, 'default')
|
||||
|
||||
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
|
||||
try:
|
||||
connect_registry(rpc, chain_spec, config.get('CIC_REGISTRY_ADDRESS'))
|
||||
except UnknownContractError as e:
|
||||
logg.exception('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
|
||||
sys.exit(1)
|
||||
|
||||
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
|
||||
if trusted_addresses_src == None:
|
||||
@@ -175,6 +185,7 @@ def main():
|
||||
trusted_addresses = trusted_addresses_src.split(',')
|
||||
for address in trusted_addresses:
|
||||
logg.info('using trusted address {}'.format(address))
|
||||
|
||||
connect_declarator(rpc, chain_spec, trusted_addresses)
|
||||
connect_token_registry(rpc, chain_spec)
|
||||
|
||||
|
||||
@@ -94,3 +94,8 @@ class CriticalWeb3AndSignerTask(CriticalTask):
|
||||
def hello(self):
|
||||
time.sleep(0.1)
|
||||
return id(SessionBase.create_session)
|
||||
|
||||
|
||||
@celery_app.task()
|
||||
def check_health(self):
|
||||
celery.app.control.shutdown()
|
||||
|
||||
@@ -10,7 +10,7 @@ version = (
|
||||
0,
|
||||
11,
|
||||
0,
|
||||
'beta.7',
|
||||
'beta.8',
|
||||
)
|
||||
|
||||
version_object = semver.VersionInfo(
|
||||
|
||||
Reference in New Issue
Block a user