Merge branch 'lash/health-util' into 'master'

K8s health utilities for cic containers

See merge request grassrootseconomics/cic-internal-integration!108
This commit is contained in:
Louis Holbrook 2021-04-21 17:34:13 +00:00
commit 660d524401
21 changed files with 471 additions and 6 deletions

4
.gitignore vendored
View File

@ -4,3 +4,7 @@ service-configs/*
__pycache__ __pycache__
*.pyc *.pyc
*.o *.o
gmon.out
*.egg-info
dist/
build/

View File

@ -2,4 +2,9 @@
. ./db.sh . ./db.sh
if [ $? -ne "0" ]; then
>&2 echo db migrate fail
exit 1
fi
/usr/local/bin/cic-cache-trackerd $@ /usr/local/bin/cic-cache-trackerd $@

View File

@ -0,0 +1,7 @@
from cic_eth.db.models.base import SessionBase
def health(*args, **kwargs):
session = SessionBase.create_session()
session.execute('SELECT count(*) from alembic_version')
session.close()
return True

View File

@ -15,6 +15,7 @@ from chainlib.connection import RPCConnection
from chainlib.eth.connection import EthUnixSignerConnection from chainlib.eth.connection import EthUnixSignerConnection
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainqueue.db.models.otx import Otx from chainqueue.db.models.otx import Otx
import liveness.linux
# local imports # local imports
from cic_eth.eth import ( from cic_eth.eth import (
@ -52,6 +53,7 @@ from cic_eth.registry import (
connect_token_registry, connect_token_registry,
) )
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -90,14 +92,15 @@ config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}:\n{}'.format(args.c, config)) logg.debug('config loaded from {}:\n{}'.format(args.c, config))
health_modules = config.get('CIC_HEALTH_MODULES', [])
if len(health_modules) != 0:
health_modules = health_modules.split(',')
logg.debug('health mods {}'.format(health_modules))
# connect to database # connect to database
dsn = dsn_from_config(config) dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG')) SessionBase.connect(dsn, pool_size=int(config.get('DATABASE_POOL_SIZE')), debug=config.true('DATABASE_DEBUG'))
# verify database connection with minimal sanity query
session = SessionBase.create_session()
session.execute('select version_num from alembic_version')
session.close()
# set up celery # set up celery
current_app = celery.Celery(__name__) current_app = celery.Celery(__name__)
@ -139,6 +142,7 @@ RPCConnection.register_location(config.get('SIGNER_SOCKET_PATH'), chain_spec, 's
Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS') Otx.tracing = config.true('TASKS_TRACE_QUEUE_STATUS')
liveness.linux.load(health_modules)
def main(): def main():
argv = ['worker'] argv = ['worker']
@ -174,7 +178,9 @@ def main():
connect_declarator(rpc, chain_spec, trusted_addresses) connect_declarator(rpc, chain_spec, trusted_addresses)
connect_token_registry(rpc, chain_spec) connect_token_registry(rpc, chain_spec)
liveness.linux.set()
current_app.worker_main(argv) current_app.worker_main(argv)
liveness.linux.reset()
@celery.signals.eventlet_pool_postshutdown.connect @celery.signals.eventlet_pool_postshutdown.connect

View File

@ -3,3 +3,4 @@ registry_address =
chain_spec = evm:bloxberg:8996 chain_spec = evm:bloxberg:8996
tx_retry_delay = tx_retry_delay =
trust_address = trust_address =
health_modules = cic_eth.k8s.db

View File

@ -53,3 +53,5 @@ COPY cic-eth/crypto_dev_signer_config/ /usr/local/etc/crypto-dev-signer/
RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \ RUN git clone https://gitlab.com/grassrootseconomics/cic-contracts.git && \
mkdir -p /usr/local/share/cic/solidity && \ mkdir -p /usr/local/share/cic/solidity && \
cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi cp -R cic-contracts/abis /usr/local/share/cic/solidity/abi
COPY util/liveness/health.sh /usr/local/bin/health.sh

View File

@ -1,4 +1,4 @@
cic-base~=0.1.2a76 cic-base==0.1.2a79+build.35e442bc
celery==4.4.7 celery==4.4.7
crypto-dev-signer~=0.4.14b2 crypto-dev-signer~=0.4.14b2
confini~=0.3.6rc3 confini~=0.3.6rc3

View File

@ -38,6 +38,7 @@ packages =
cic_eth.runnable.daemons.filters cic_eth.runnable.daemons.filters
cic_eth.callbacks cic_eth.callbacks
cic_eth.sync cic_eth.sync
cic_eth.k8s
scripts = scripts =
./scripts/migrate.py ./scripts/migrate.py

View File

@ -0,0 +1 @@
include *health*.sh

View File

@ -0,0 +1,10 @@
docs:
mkdir -p doc/texinfo/html
makeinfo doc/texinfo/index.texi --html -o doc/texinfo/html/
markdown: doc
pandoc -f html -t markdown --standalone doc/texinfo/html/liveness.html -o README.md
.PHONY dist:
python setup.py sdist

View File

@ -0,0 +1,105 @@
---
description: liveness (Untitled Document)
distribution: global
Generator: makeinfo
keywords: liveness (Untitled Document)
lang: en
resource-type: document
title: liveness (Untitled Document)
---
[]{#liveness}[]{#liveness-1}
## 1 liveness {#liveness .chapter}
[]{#ilveness_005foverview}[]{#Overview}
### 1.1 Overview {#overview .section}
This is a cluster-specific convenience setup for enabling a
Kubernetes-style liveness/readiness test as outlined in
<https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/>.
Conceptually, it provides an application with means to:
- Run a collection of functions to validate sanity of the environment
- Set a no-error state before execution of the main routine
- Modify the error state during execution
- Invalidating all state when execution ends
[]{#Python-module}
### 1.2 Python module {#python-module .section}
Three python methods are provided.
[]{#load}
#### 1.2.1 load {#load .subsection}
This is meant to be called after configurations and environment has been
set up, but before the execution logic has commenced.
It receives a list of externally defined fully-qualified python modules.
Each of these modules must implement the method `health(*args,**kwargs)`
in its global namespace.
Any module returning `False` will cause a `RuntimeException`.
The component will not trap any other exception from the modules.
If successful, it will write the `pid` of the application to the
specified run data folder. By default this is `/run/<HOSTNAME>`, but the
path can be modified if desired.
[]{#set}
#### 1.2.2 set {#set .subsection}
This is meant to be called during the execution of the main program
routine begins.
[]{#at-startup}
#### 1.2.2.1 at startup {#at-startup .subsubsection}
It should be called once at the *start* of execution of the main program
routine.
For one-shot routines, this would mean the start of any code only run
when the module name is `__main__`.
For daemons, it would be just before handing over execution to the main
loop.
[]{#during-execution}
#### 1.2.2.2 during execution {#during-execution .subsubsection}
Call `set(error_code=<error>, ...` any time the health state temporarily
changes. Any `error` value other than `0` is considered an unhealthy
state.
[]{#at-shutdown}
#### 1.2.2.3 at shutdown {#at-shutdown .subsubsection}
Call `reset(...)`, which will indicate that the state is to be
considered the same as at startup.
[]{#shell}
### 1.3 shell {#shell .section}
A bash script is provided for *Kubernetes* to perform the health check.
It performs the following checks:
1. A numeric value exists in `<rundir>/<unitname>/pid`{.sample}.
2. The numeric value is a directory in `/proc`{.sample} (a valid pid)
3. The file `<rundir>/<unitname>/error`{.sample} contains \"0\"
If any of these checks fail should inditcate that the container is
unhealthy.
------------------------------------------------------------------------

View File

@ -0,0 +1,71 @@
@node liveness
@chapter liveness
@anchor{ilveness_overview}
@section Overview
This is a cluster-specific convenience setup for enabling a Kubernetes-style liveness/readiness test as outlined in @url{https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/}.
Conceptually, it provides an application with means to:
@itemize
@item Run a collection of functions to validate sanity of the environment
@item Set a no-error state before execution of the main routine
@item Modify the error state during execution
@item Invalidating all state when execution ends
@end itemize
@section Python module
Three python methods are provided.
@subsection load
This is meant to be called after configurations and environment has been set up, but before the execution logic has commenced.
It receives a list of externally defined fully-qualified python modules. Each of these modules must implement the method @code{health(*args,**kwargs)} in its global namespace.
Any module returning @code{False} will cause a @code{RuntimeException}.
The component will not trap any other exception from the modules.
If successful, it will write the @code{pid} of the application to the specified run data folder. By default this is @code{/run/<HOSTNAME>}, but the path can be modified if desired.
@subsection set
This is meant to be called during the execution of the main program routine begins.
@subsubsection at startup
It should be called once at the @emph{start} of execution of the main program routine.
For one-shot routines, this would mean the start of any code only run when the module name is @code{__main__}.
For daemons, it would be just before handing over execution to the main loop.
@subsubsection during execution
Call @code{set(error_code=<error>, ...} any time the health state temporarily changes. Any @code{error} value other than @code{0} is considered an unhealthy state.
@subsubsection at shutdown
Call @code{reset(...)}, which will indicate that the state is to be considered the same as at startup.
@section shell
A bash script is provided for @emph{Kubernetes} to perform the health check.
It performs the following checks:
@enumerate
@item A numeric value exists in @file{<rundir>/<unitname>/pid}.
@item The numeric value is a directory in @file{/proc} (a valid pid)
@item The file @file{<rundir>/<unitname>/error} contains "0"
@end enumerate
If any of these checks fail should inditcate that the container is unhealthy.

View File

@ -0,0 +1,35 @@
#!/bin/bash
rundir=${CIC_RUNDIR:-/run}
unit=${CIC_UNIT:-$HOSTNAME}
read p < $rundir/$unit/pid
if [ -z $p ]; then
>&2 echo unit $unit has no pid
exit 1
fi
if [ ! -d /proc/$p ]; then
>&2 echo unit $unit reports non-existent pid $p
exit 1
fi
>&2 echo unit $unit has pid $p
if [ ! -f $rundir/$unit/error ]; then
>&2 echo unit $unit has unspecified state
exit 1
fi
read e 2> /dev/null < $rundir/$unit/error
if [ -z $e ]; then
>&2 echo unit $unit has unspecified state
exit 1
fi
>&2 echo unit $unit has error $e
if [ $e -gt 0 ]; then
exit 1;
fi

View File

@ -0,0 +1,54 @@
# standard imports
import importlib
import sys
import os
import logging
logg = logging.getLogger().getChild(__name__)
pid = os.getpid()
default_namespace = os.environ.get('LIVENESS_UNIT_NAME')
if default_namespace == None:
import socket
default_namespace = socket.gethostname()
def load(check_strs, namespace=default_namespace, rundir='/run', *args, **kwargs):
if namespace == None:
import socket
namespace = socket.gethostname()
logg.info('pid ' + str(pid))
checks = []
for m in check_strs:
logg.debug('added liveness check: {}'.format(str(m)))
module = importlib.import_module(m)
checks.append(module)
for check in checks:
r = check.health(args, kwargs)
if r == False:
raise RuntimeError('liveness check {} failed'.format(str(check)))
logg.info('liveness check passed: {}'.format(str(check)))
app_rundir = os.path.join(rundir, namespace)
os.makedirs(app_rundir, exist_ok=True) # should not already exist
f = open(os.path.join(app_rundir, 'pid'), 'w')
f.write(str(pid))
f.close()
def set(error=0, namespace=default_namespace, rundir='/run'):
app_rundir = os.path.join(rundir, namespace)
f = open(os.path.join(app_rundir, 'error'), 'w')
f.write(str(error))
f.close()
def reset(namespace=default_namespace, rundir='/run'):
app_rundir = os.path.join(rundir, namespace)
os.unlink(os.path.join(app_rundir, 'pid'))
os.unlink(os.path.join(app_rundir, 'error'))

View File

@ -0,0 +1,7 @@
from setuptools import setup
setup(
name='liveness',
version='0.0.1a6',
packages=['liveness'],
include_package_data=True,
)

View File

@ -0,0 +1,17 @@
#!/bin/bash
export CIC_RUNDIR=`realpath ./tests/testdata/run`
t=`mktemp -d -p $CIC_RUNDIR`
export CIC_UNIT=`basename $t`
>&2 echo test pid $$
echo $$ > $t/pid
echo 0 > $t/error
. health.sh
echo 1 > $t/error
#unlink $t/error
. health.sh
echo if error this is not printed

View File

@ -0,0 +1,8 @@
a = ['foo']
kw = {
'bar': 42,
}
def health(*args, **kwargs):
args[0] == a[0]
kwargs['bar'] = kw['bar']

View File

@ -0,0 +1,2 @@
def health(*args, **kwargs):
return False

View File

@ -0,0 +1,2 @@
def health(*args, **kwargs):
return True

View File

@ -0,0 +1,127 @@
# standard imports
import os
import unittest
import logging
import tempfile
import socket
# local imports
import liveness.linux
## test imports
import tests.imports
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__))
data_dir = os.path.join(script_dir, 'testdata')
run_base_dir = os.path.join(data_dir, 'run')
class TestImports(unittest.TestCase):
def setUp(self):
os.makedirs(run_base_dir, exist_ok=True)
self.run_dir = tempfile.mkdtemp(dir=run_base_dir)
self.unit = 'unittest'
self.unit_dir = os.path.join(self.run_dir, self.unit)
self.pid_path = os.path.join(self.unit_dir, 'pid')
self.error_path = os.path.join(self.unit_dir, 'error')
self.host_path = os.path.join(self.run_dir, socket.gethostname())
def test_no_import(self):
liveness.linux.load([], namespace=self.unit, rundir=self.run_dir)
f = open(self.pid_path, 'r')
r = f.read()
f.close()
self.assertEqual(str(os.getpid()), r)
def test_hostname(self):
liveness.linux.load([], rundir=self.run_dir)
f = open(os.path.join(self.host_path, 'pid'), 'r')
r = f.read()
f.close()
self.assertEqual(str(os.getpid()), r)
def test_import_single_true(self):
checks = ['tests.imports.import_true']
liveness.linux.load(checks, namespace=self.unit, rundir=self.run_dir)
f = open(self.pid_path, 'r')
r = f.read()
f.close()
self.assertEqual(str(os.getpid()), r)
def test_import_single_false(self):
checks = ['tests.imports.import_false']
with self.assertRaises(RuntimeError):
liveness.linux.load(checks, namespace=self.unit, rundir=self.run_dir)
with self.assertRaises(FileNotFoundError):
os.stat(self.pid_path)
def test_import_false_then_true(self):
checks = ['tests.imports.import_false', 'tests.imports.import_true']
with self.assertRaises(RuntimeError):
liveness.linux.load(checks, namespace=self.unit, rundir=self.run_dir)
with self.assertRaises(FileNotFoundError):
os.stat(self.pid_path)
def test_import_multiple_true(self):
checks = ['tests.imports.import_true', 'tests.imports.import_true']
liveness.linux.load(checks, namespace=self.unit, rundir=self.run_dir)
f = open(self.pid_path, 'r')
r = f.read()
f.close()
self.assertEqual(str(os.getpid()), r)
def test_set(self):
liveness.linux.load([], namespace='unittest', rundir=self.run_dir)
liveness.linux.set(namespace='unittest', rundir=self.run_dir)
f = open(self.error_path, 'r')
r = f.read()
f.close()
self.assertEqual('0', r)
liveness.linux.set(error=42, namespace='unittest', rundir=self.run_dir)
f = open(self.error_path, 'r')
r = f.read()
f.close()
self.assertEqual('42', r)
liveness.linux.reset(namespace='unittest', rundir=self.run_dir)
with self.assertRaises(FileNotFoundError):
os.stat(self.error_path)
def test_set_hostname(self):
liveness.linux.load([], rundir=self.run_dir)
liveness.linux.set(rundir=self.run_dir)
error_path = os.path.join(self.host_path, 'error')
f = open(error_path, 'r')
r = f.read()
f.close()
self.assertEqual('0', r)
liveness.linux.reset(rundir=self.run_dir)
with self.assertRaises(FileNotFoundError):
os.stat(error_path)
def test_args(self):
checks = ['tests.imports.import_args']
liveness.linux.load(checks, namespace=self.unit, rundir=self.run_dir, args=['foo'], kwargs={'bar': 42})
f = open(self.pid_path, 'r')
r = f.read()
f.close()
self.assertEqual(str(os.getpid()), r)
if __name__ == '__main__':
unittest.main()