WIP factor out config processing

This commit is contained in:
lash 2022-05-11 18:20:45 +00:00
parent 88cf5500bf
commit c739652203
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 401 additions and 292 deletions

View File

@ -290,7 +290,7 @@ class Arg(BaseArg):
self.set_long('n', 'namespace', dest='namespace')
self.add('c', 'config', dest='config', help='Configuration directory')
self.set_long('c', 'config')
self.add_long('dumpconfig', 'config', typ=bool, help='Output configuration and quit. Use with --raw to omit values and output schema only.')
self.add_long('dumpconfig', 'config', help='Output configuration and quit. Use with --raw to omit values and output schema only.')
self.add('w', 'wait', typ=bool, help='Wait for the last transaction to be confirmed')
self.add('ww', 'wait', check=False, typ=bool, help='Wait for every transaction to be confirmed')

View File

@ -36,278 +36,372 @@ class Config(confini.Config):
default_base_config_dir = default_parent_config_dir
default_fee_limit = 0
@staticmethod
def override_defaults(base_dir=None, default_fee_limit=None):
if base_dir != None:
Config.default_base_config_dir = os.path.realpath(base_dir)
if default_fee_limit != None:
Config.default_fee_limit = int(default_fee_limit)
def __init__(self, config_dir=None, namespace=None):
self.namespace = namespace
if config_dir == None:
config_dir = self.default_base_config_dir
if self.namespace != None:
config_dir = os.path.join(config_dir, namespace)
super(Config, self).__init__(config_dir)
@classmethod
def from_args(cls, args, arg_flags=0x0f, env=os.environ, extra_args={}, base_config_dir=None, default_config_dir=None, user_config_dir=None, default_fee_limit=None, logger=None, load_callback=None, dump_writer=sys.stdout):
"""Parses arguments in argparse.ArgumentParser instance, then match and override configuration values that match them.
def add_user_dir(self, v):
if self.namespace != None:
v = os.path.join(v, self.namespace)
return super(Config, self).add_override_dir(v)
The method processes all known argument flags from chainlib.cli.Flag passed in the "args" argument.
# @staticmethod
# def override_defaults(base_dir=None, default_fee_limit=None):
# if base_dir != None:
# Config.default_base_config_dir = os.path.realpath(base_dir)
# if default_fee_limit != None:
# Config.default_fee_limit = int(default_fee_limit)
#
#
# @classmethod
# def from_args(cls, args, arg_flags=0x0f, env=os.environ, extra_args={}, base_config_dir=None, default_config_dir=None, user_config_dir=None, default_fee_limit=None, logger=None, load_callback=None, dump_writer=sys.stdout):
# """Parses arguments in argparse.ArgumentParser instance, then match and override configuration values that match them.
#
# The method processes all known argument flags from chainlib.cli.Flag passed in the "args" argument.
#
# All entries in extra_args may be used to associate arguments not defined in the argument flags with configuration variables, in the following manner:
#
# - The value of argparser.ArgumentParser instance attribute with the dictionary key string is looked up.
# - If the value is None (defined but empty), any existing value for the configuration directive will be kept.
# - If the value of the extra_args dictionary entry is None, then the value will be stored in the configuration under the upper-case value of the key string, prefixed with "_" ("foo_bar" becomes "_FOO_BAR")
# - If the value of the extra_args dictionary entries is a string, then the value will be stored in the configuration under that literal string.
#
# Missing attributes defined by both the "args" and "extra_args" arguments will both raise an AttributeError.
#
# The python package "confini" is used to process and render the configuration.
#
# The confini config schema is determined in the following manner:
#
# - If nothing is set, only the config folder in chainlib.data.config will be used as schema.
# - If base_config_dir is a string or list, the config directives from the path(s) will be added to the schema.
#
# The global override config directories are determined in the following manner:
#
# - If no default_config_dir is defined, the environment variable CONFINI_DIR will be used.
# - If default_config_dir is a string or list, values from the config directives from the path(s) will override those defined in the schema(s).
#
# The user override config directories work the same way as the global ones, but the namespace - if defined - are dependent on them. They are only applied if the CONFIG arg flag is set. User override config directories are determined in the following manner:
#
# - If --config argument is not defined and the pyxdg module is present, the first available xdg basedir is used.
# - If --config argument is defined, the directory defined by its value will be used.
#
# The namespace, if defined, will be stored under the CONFIG_USER_NAMESPACE configuration key.
#
# :param args: Argument parser object
# :type args: argparse.ArgumentParser
# :param arg_flags: Argument flags defining which arguments to process into configuration.
# :type arg_flags: confini.cli.args.ArgumentParser
# :param env: Environment variables selection
# :type env: dict
# :param extra_args: Extra arguments to process and override.
# :type extra_args: dict
# :param base_config_dir: Path(s) to one or more directories extending the base chainlib config schema.
# :type base_config_dir: list or str
# :param default_config_dir: Path(s) to one or more directories overriding the defaults defined in the schema config directories.
# :type default_config_dir: list or str
# :param user_config_dir: User xdg config basedir, with namespace
# :type user_config_dir: str
# :param default_fee_limit: Default value for fee limit argument
# :type default_fee_limit: int
# :param logger: Logger instance to use during argument processing (will use package namespace logger if None)
# :type logger: logging.Logger
# :param load_callback: Callback receiving config instance as argument after config processing and load completes.
# :type load_callback: function
# :raises AttributeError: Attribute defined in flag not found in parsed arguments
# :rtype: confini.Config
# :return: Processed configuation
# """
# env_prefix = getattr(args, 'env_prefix', None)
# env_prefix_str = env_prefix
# if env_prefix_str == None:
# env_prefix_str = ''
# else:
# env_prefix_str += '_'
#
# env_loglevel_key_str = env_prefix_str + 'LOGLEVEL'
# env_loglevel = os.environ.get(env_loglevel_key_str)
#
# if logger == None:
# logger = logging.getLogger()
#
# if env_loglevel != None:
# env_loglevel = env_loglevel.lower()
# if env_loglevel == '0' or env_loglevel == 'no' or env_loglevel == 'none' or env_loglevel == 'disable' or env_loglevel == 'disabled' or env_loglevel == 'off':
# logging.disable()
# elif env_loglevel == '1' or env_loglevel == 'err' or env_loglevel == 'error':
# logger.setLevel(logging.ERROR)
# elif env_loglevel == '2' or env_loglevel == 'warning' or env_loglevel == 'warn':
# logger.setLevel(logging.WARNING)
# elif env_loglevel == '3' or env_loglevel == 'info':
# logger.setLevel(logging.INFO)
# else:
# valid_level = False
# try:
# num_loglevel = int(env_loglevel)
# valid_level = True
# except:
# if env_loglevel == 'debug':
# valid_level = True
#
# if not valid_level:
# raise ValueError('unknown loglevel {} set in environment variable {}'.format(env_loglevel, env_loglevel_key_str))
#
# logger.setLevel(logging.DEBUG)
#
#
# if arg_flags & Flag.VERBOSE:
# if args.vv:
# logger.setLevel(logging.DEBUG)
# elif args.v:
# logger.setLevel(logging.INFO)
# if args.no_logs:
# logging.disable()
#
# override_config_dirs = []
# config_dir = [cls.default_base_config_dir]
#
# if user_config_dir == None:
# try:
# import xdg.BaseDirectory
# user_config_dir = xdg.BaseDirectory.load_first_config('chainlib/eth')
# except ModuleNotFoundError:
# pass
#
# # if one or more additional base dirs are defined, add these after the default base dir
# # the consecutive dirs cannot include duplicate sections
# if base_config_dir != None:
# logg.debug('have explicit base config addition {}'.format(base_config_dir))
# if isinstance(base_config_dir, str):
# base_config_dir = [base_config_dir]
# for d in base_config_dir:
# config_dir.append(d)
# logg.debug('processing config dir {}'.format(config_dir))
#
# # confini dir env var will be used for override configs only in this case
# if default_config_dir == None:
# default_config_dir = env.get('CONFINI_DIR')
# if default_config_dir != None:
# if isinstance(default_config_dir, str):
# default_config_dir = [default_config_dir]
# for d in default_config_dir:
# override_config_dirs.append(d)
#
# # process config command line arguments
# if arg_flags & Flag.CONFIG:
# effective_user_config_dir = getattr(args, 'config', None)
# if effective_user_config_dir == None:
# effective_user_config_dir = user_config_dir
# if effective_user_config_dir != None:
# if getattr(args, 'namespace', None) != None:
# effective_user_config_dir = os.path.join(effective_user_config_dir, args.namespace)
# #if config_dir == None:
# # config_dir = [cls.default_base_config_dir, effective_user_config_dir]
# # logg.debug('using config arg as base config addition {}'.format(effective_user_config_dir))
# #else:
# override_config_dirs.append(effective_user_config_dir)
# logg.debug('using config arg as config override {}'.format(effective_user_config_dir))
#
# #if config_dir == None:
# # if default_config_dir == None:
# # default_config_dir = default_parent_config_dir
# # config_dir = default_config_dir
# # override_config_dirs = []
#
# config = confini.Config(config_dir, env_prefix=env_prefix, override_dirs=override_config_dirs)
# config.process()
#
# if arg_flags & Flag.RAW > 0:
# config.add(getattr(args, 'raw'), '_RAW')
#
# args_override = {}
#
# if arg_flags & Flag.PROVIDER:
# args_override['RPC_PROVIDER'] = getattr(args, 'p')
# args_override['RPC_DIALECT'] = getattr(args, 'rpc_dialect')
# if arg_flags & Flag.CHAIN_SPEC:
# args_override['CHAIN_SPEC'] = getattr(args, 'i')
# if arg_flags & Flag.KEY_FILE:
# args_override['WALLET_KEY_FILE'] = getattr(args, 'y')
# fp = getattr(args, 'passphrase_file')
# if fp != None:
# st = os.stat(fp)
# if stat.S_IMODE(st.st_mode) & (stat.S_IRWXO | stat.S_IRWXG) > 0:
# logg.warning('others than owner have access on password file')
# f = open(fp, 'r')
# args_override['WALLET_PASSPHRASE'] = f.read()
# f.close()
# config.censor('PASSPHRASE', 'WALLET')
# config.dict_override(args_override, 'cli args', allow_empty=True)
#
# if arg_flags & (Flag.PROVIDER | Flag.NO_TARGET) == Flag.PROVIDER:
# config.add(getattr(args, 'height'), '_HEIGHT')
# if arg_flags & Flag.UNSAFE:
# config.add(getattr(args, 'u'), '_UNSAFE')
# if arg_flags & (Flag.SIGN | Flag.FEE):
# config.add(getattr(args, 'fee_price'), '_FEE_PRICE')
# fee_limit = getattr(args, 'fee_limit')
# if fee_limit == None:
# fee_limit = default_fee_limit
# if fee_limit == None:
# fee_limit = cls.default_fee_limit
# config.add(fee_limit, '_FEE_LIMIT')
# if arg_flags & (Flag.SIGN | Flag.NONCE):
# config.add(getattr(args, 'nonce'), '_NONCE')
#
# if arg_flags & Flag.SIGN:
# config.add(getattr(args, 's'), '_RPC_SEND')
#
# # handle wait
# wait = 0
# if args.w:
# wait |= Flag.WAIT
# if args.ww:
# wait |= Flag.WAIT_ALL
# wait_last = wait & (Flag.WAIT | Flag.WAIT_ALL)
# config.add(bool(wait_last), '_WAIT')
# wait_all = wait & Flag.WAIT_ALL
# config.add(bool(wait_all), '_WAIT_ALL')
#
#
# if arg_flags & Flag.SEQ:
# config.add(getattr(args, 'seq'), '_SEQ')
# if arg_flags & Flag.WALLET:
# config.add(getattr(args, 'recipient'), '_RECIPIENT')
# if arg_flags & Flag.EXEC:
# config.add(getattr(args, 'executable_address'), '_EXEC_ADDRESS')
#
# if arg_flags & Flag.CONFIG:
# config.add(getattr(args, 'namespace'), 'CONFIG_USER_NAMESPACE')
#
# if arg_flags & Flag.RPC_AUTH:
# config.add(getattr(args, 'rpc_auth'), 'RPC_AUTH')
# config.add(getattr(args, 'rpc_credentials'), 'RPC_CREDENTIALS')
#
# for k in extra_args.keys():
# logg.debug('extra_agrs {}'.format(k))
# v = extra_args[k]
# if v == None:
# v = '_' + k.upper()
# r = getattr(args, k)
# existing_r = None
# try:
# existing_r = config.get(v)
# except KeyError:
# pass
# if existing_r == None or r != None:
# config.add(r, v, exists_ok=True)
# logg.debug('added {} to {}'.format(r, v))
#
# if getattr(args, 'dumpconfig', None):
# if args.dumpconfig == 'ini':
# from confini.export import ConfigExporter
# exporter = ConfigExporter(config, target=sys.stdout, doc=False)
# exporter.export(exclude_sections=['config'])
# elif args.dumpconfig == 'env':
# from confini.env import export_env
# export_env(config)
#
## config_keys = config.all()
## with_values = not config.get('_RAW')
## for k in config_keys:
## if k[0] == '_':
## continue
## s = k + '='
## if with_values:
## v = config.get(k)
## if v != None:
## s += str(v)
## s += '\n'
## dump_writer.write(s)
# sys.exit(0)
#
# if load_callback != None:
# load_callback(config)
#
# return config
#
#
def process_config(config, arg, args, flags):
All entries in extra_args may be used to associate arguments not defined in the argument flags with configuration variables, in the following manner:
if arg.match('env', flags):
config.set_env_prefix(getattr(args, 'env_prefix'))
- The value of argparser.ArgumentParser instance attribute with the dictionary key string is looked up.
- If the value is None (defined but empty), any existing value for the configuration directive will be kept.
- If the value of the extra_args dictionary entry is None, then the value will be stored in the configuration under the upper-case value of the key string, prefixed with "_" ("foo_bar" becomes "_FOO_BAR")
- If the value of the extra_args dictionary entries is a string, then the value will be stored in the configuration under that literal string.
args_override = {}
if arg.match('raw', flags):
config.add(getattr(args, 'raw', None), '_RAW')
Missing attributes defined by both the "args" and "extra_args" arguments will both raise an AttributeError.
if arg.match('provider', flags):
args_override['RPC_PROVIDER'] = getattr(args, 'p')
args_override['RPC_DIALECT'] = getattr(args, 'rpc_dialect')
The python package "confini" is used to process and render the configuration.
if arg.match('chain_spec', flags):
args_override['CHAIN_SPEC'] = getattr(args, 'i')
The confini config schema is determined in the following manner:
if arg.match('config', flags):
config.add(getattr(args, 'namespace', None), 'CONFIG_USER_NAMESPACE')
- If nothing is set, only the config folder in chainlib.data.config will be used as schema.
- If base_config_dir is a string or list, the config directives from the path(s) will be added to the schema.
if arg.match('key_file', flags):
args_override['WALLET_KEY_FILE'] = getattr(args, 'y')
fp = getattr(args, 'passphrase_file')
if fp != None:
st = os.stat(fp)
if stat.S_IMODE(st.st_mode) & (stat.S_IRWXO | stat.S_IRWXG) > 0:
logg.warning('others than owner have access on password file')
f = open(fp, 'r')
args_override['WALLET_PASSPHRASE'] = f.read()
f.close()
config.censor('PASSPHRASE', 'WALLET')
config.dict_override(args_override, 'cli args', allow_empty=True)
The global override config directories are determined in the following manner:
config.process()
- If no default_config_dir is defined, the environment variable CONFINI_DIR will be used.
- If default_config_dir is a string or list, values from the config directives from the path(s) will override those defined in the schema(s).
The user override config directories work the same way as the global ones, but the namespace - if defined - are dependent on them. They are only applied if the CONFIG arg flag is set. User override config directories are determined in the following manner:
- If --config argument is not defined and the pyxdg module is present, the first available xdg basedir is used.
- If --config argument is defined, the directory defined by its value will be used.
The namespace, if defined, will be stored under the CONFIG_USER_NAMESPACE configuration key.
:param args: Argument parser object
:type args: argparse.ArgumentParser
:param arg_flags: Argument flags defining which arguments to process into configuration.
:type arg_flags: confini.cli.args.ArgumentParser
:param env: Environment variables selection
:type env: dict
:param extra_args: Extra arguments to process and override.
:type extra_args: dict
:param base_config_dir: Path(s) to one or more directories extending the base chainlib config schema.
:type base_config_dir: list or str
:param default_config_dir: Path(s) to one or more directories overriding the defaults defined in the schema config directories.
:type default_config_dir: list or str
:param user_config_dir: User xdg config basedir, with namespace
:type user_config_dir: str
:param default_fee_limit: Default value for fee limit argument
:type default_fee_limit: int
:param logger: Logger instance to use during argument processing (will use package namespace logger if None)
:type logger: logging.Logger
:param load_callback: Callback receiving config instance as argument after config processing and load completes.
:type load_callback: function
:raises AttributeError: Attribute defined in flag not found in parsed arguments
:rtype: confini.Config
:return: Processed configuation
"""
env_prefix = getattr(args, 'env_prefix', None)
env_prefix_str = env_prefix
if env_prefix_str == None:
env_prefix_str = ''
else:
env_prefix_str += '_'
env_loglevel_key_str = env_prefix_str + 'LOGLEVEL'
env_loglevel = os.environ.get(env_loglevel_key_str)
if logger == None:
logger = logging.getLogger()
if env_loglevel != None:
env_loglevel = env_loglevel.lower()
if env_loglevel == '0' or env_loglevel == 'no' or env_loglevel == 'none' or env_loglevel == 'disable' or env_loglevel == 'disabled' or env_loglevel == 'off':
logging.disable()
elif env_loglevel == '1' or env_loglevel == 'err' or env_loglevel == 'error':
logger.setLevel(logging.ERROR)
elif env_loglevel == '2' or env_loglevel == 'warning' or env_loglevel == 'warn':
logger.setLevel(logging.WARNING)
elif env_loglevel == '3' or env_loglevel == 'info':
logger.setLevel(logging.INFO)
else:
valid_level = False
try:
num_loglevel = int(env_loglevel)
valid_level = True
except:
if env_loglevel == 'debug':
valid_level = True
if not valid_level:
raise ValueError('unknown loglevel {} set in environment variable {}'.format(env_loglevel, env_loglevel_key_str))
logger.setLevel(logging.DEBUG)
return config
if arg_flags & Flag.VERBOSE:
if args.vv:
logger.setLevel(logging.DEBUG)
elif args.v:
logger.setLevel(logging.INFO)
if args.no_logs:
logging.disable()
override_config_dirs = []
config_dir = [cls.default_base_config_dir]
if flags & (Flag.PROVIDER | Flag.NO_TARGET) == Flag.PROVIDER:
config.add(getattr(args, 'height'), '_HEIGHT')
if flags & Flag.UNSAFE:
config.add(getattr(args, 'u'), '_UNSAFE')
if flags & (Flag.SIGN | Flag.FEE):
config.add(getattr(args, 'fee_price'), '_FEE_PRICE')
fee_limit = getattr(args, 'fee_limit')
if fee_limit == None:
fee_limit = default_fee_limit
if fee_limit == None:
fee_limit = cls.default_fee_limit
config.add(fee_limit, '_FEE_LIMIT')
if flags & (Flag.SIGN | Flag.NONCE):
config.add(getattr(args, 'nonce'), '_NONCE')
if user_config_dir == None:
try:
import xdg.BaseDirectory
user_config_dir = xdg.BaseDirectory.load_first_config('chainlib/eth')
except ModuleNotFoundError:
pass
if flags & Flag.SIGN:
config.add(getattr(args, 's'), '_RPC_SEND')
# if one or more additional base dirs are defined, add these after the default base dir
# the consecutive dirs cannot include duplicate sections
if base_config_dir != None:
logg.debug('have explicit base config addition {}'.format(base_config_dir))
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
config_dir.append(d)
logg.debug('processing config dir {}'.format(config_dir))
# confini dir env var will be used for override configs only in this case
if default_config_dir == None:
default_config_dir = env.get('CONFINI_DIR')
if default_config_dir != None:
if isinstance(default_config_dir, str):
default_config_dir = [default_config_dir]
for d in default_config_dir:
override_config_dirs.append(d)
# process config command line arguments
if arg_flags & Flag.CONFIG:
effective_user_config_dir = getattr(args, 'config', None)
if effective_user_config_dir == None:
effective_user_config_dir = user_config_dir
if effective_user_config_dir != None:
if getattr(args, 'namespace', None) != None:
effective_user_config_dir = os.path.join(effective_user_config_dir, args.namespace)
#if config_dir == None:
# config_dir = [cls.default_base_config_dir, effective_user_config_dir]
# logg.debug('using config arg as base config addition {}'.format(effective_user_config_dir))
#else:
override_config_dirs.append(effective_user_config_dir)
logg.debug('using config arg as config override {}'.format(effective_user_config_dir))
#if config_dir == None:
# if default_config_dir == None:
# default_config_dir = default_parent_config_dir
# config_dir = default_config_dir
# override_config_dirs = []
config = confini.Config(config_dir, env_prefix=env_prefix, override_dirs=override_config_dirs)
config.process()
if arg_flags & Flag.RAW > 0:
config.add(getattr(args, 'raw'), '_RAW')
args_override = {}
if arg_flags & Flag.PROVIDER:
args_override['RPC_PROVIDER'] = getattr(args, 'p')
args_override['RPC_DIALECT'] = getattr(args, 'rpc_dialect')
if arg_flags & Flag.CHAIN_SPEC:
args_override['CHAIN_SPEC'] = getattr(args, 'i')
if arg_flags & Flag.KEY_FILE:
args_override['WALLET_KEY_FILE'] = getattr(args, 'y')
fp = getattr(args, 'passphrase_file')
if fp != None:
st = os.stat(fp)
if stat.S_IMODE(st.st_mode) & (stat.S_IRWXO | stat.S_IRWXG) > 0:
logg.warning('others than owner have access on password file')
f = open(fp, 'r')
args_override['WALLET_PASSPHRASE'] = f.read()
f.close()
config.censor('PASSPHRASE', 'WALLET')
config.dict_override(args_override, 'cli args', allow_empty=True)
if arg_flags & (Flag.PROVIDER | Flag.NO_TARGET) == Flag.PROVIDER:
config.add(getattr(args, 'height'), '_HEIGHT')
if arg_flags & Flag.UNSAFE:
config.add(getattr(args, 'u'), '_UNSAFE')
if arg_flags & (Flag.SIGN | Flag.FEE):
config.add(getattr(args, 'fee_price'), '_FEE_PRICE')
fee_limit = getattr(args, 'fee_limit')
if fee_limit == None:
fee_limit = default_fee_limit
if fee_limit == None:
fee_limit = cls.default_fee_limit
config.add(fee_limit, '_FEE_LIMIT')
if arg_flags & (Flag.SIGN | Flag.NONCE):
config.add(getattr(args, 'nonce'), '_NONCE')
if arg_flags & Flag.SIGN:
config.add(getattr(args, 's'), '_RPC_SEND')
# handle wait
wait = 0
if args.w:
wait |= Flag.WAIT
if args.ww:
wait |= Flag.WAIT_ALL
wait_last = wait & (Flag.WAIT | Flag.WAIT_ALL)
config.add(bool(wait_last), '_WAIT')
wait_all = wait & Flag.WAIT_ALL
config.add(bool(wait_all), '_WAIT_ALL')
# handle wait
wait = 0
if args.w:
wait |= Flag.WAIT
if args.ww:
wait |= Flag.WAIT_ALL
wait_last = wait & (Flag.WAIT | Flag.WAIT_ALL)
config.add(bool(wait_last), '_WAIT')
wait_all = wait & Flag.WAIT_ALL
config.add(bool(wait_all), '_WAIT_ALL')
if arg_flags & Flag.SEQ:
config.add(getattr(args, 'seq'), '_SEQ')
if arg_flags & Flag.WALLET:
config.add(getattr(args, 'recipient'), '_RECIPIENT')
if arg_flags & Flag.EXEC:
config.add(getattr(args, 'executable_address'), '_EXEC_ADDRESS')
if flags & Flag.SEQ:
config.add(getattr(args, 'seq'), '_SEQ')
if flags & Flag.WALLET:
config.add(getattr(args, 'recipient'), '_RECIPIENT')
if flags & Flag.EXEC:
config.add(getattr(args, 'executable_address'), '_EXEC_ADDRESS')
if flags & Flag.RPC_AUTH:
config.add(getattr(args, 'rpc_auth'), 'RPC_AUTH')
config.add(getattr(args, 'rpc_credentials'), 'RPC_CREDENTIALS')
if arg_flags & Flag.CONFIG:
config.add(getattr(args, 'namespace'), 'CONFIG_USER_NAMESPACE')
if arg_flags & Flag.RPC_AUTH:
config.add(getattr(args, 'rpc_auth'), 'RPC_AUTH')
config.add(getattr(args, 'rpc_credentials'), 'RPC_CREDENTIALS')
for k in extra_args.keys():
logg.debug('extra_agrs {}'.format(k))
v = extra_args[k]
if v == None:
v = '_' + k.upper()
r = getattr(args, k)
existing_r = None
try:
existing_r = config.get(v)
except KeyError:
pass
if existing_r == None or r != None:
config.add(r, v, exists_ok=True)
logg.debug('added {} to {}'.format(r, v))
if getattr(args, 'dumpconfig', None):
if args.dumpconfig == 'ini':
from confini.export import ConfigExporter
exporter = ConfigExporter(config, target=sys.stdout, doc=False)
exporter.export(exclude_sections=['config'])
elif args.dumpconfig == 'env':
from confini.env import export_env
export_env(config)
# config_keys = config.all()
# with_values = not config.get('_RAW')
# for k in config_keys:
# if k[0] == '_':
# continue
# s = k + '='
# if with_values:
# v = config.get(k)
# if v != None:
# s += str(v)
# s += '\n'
# dump_writer.write(s)
sys.exit(0)
if load_callback != None:
load_callback(config)
return config

View File

@ -7,14 +7,16 @@ import logging
from aiee.arg import process_args
# local imports
import chainlib.cli
#from chainlib.cli.base import argflag_std_base
from chainlib.cli.arg import (
ArgFlag,
Arg,
ArgumentParser,
)
from chainlib.cli.config import (
Config,
process_config,
)
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata')
config_dir = os.path.join(data_dir, 'config')
@ -40,23 +42,25 @@ class TestCli(unittest.TestCase):
'foo',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags)
config = Config(config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CONFIG_USER_NAMESPACE'), 'foo')
def test_args_process_schema_override(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
args = ap.parse_args([])
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, base_config_dir=config_dir)
config = Config(config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('FOO_BAR'), 'baz')
def test_args_process_arg_override(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
@ -67,59 +71,70 @@ class TestCli(unittest.TestCase):
'foo',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, base_config_dir=config_dir)
config = Config(config_dir, namespace=args.namespace)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('FOO_BAR'), 'bazbazbaz')
def test_args_process_internal_override(self):
ap = chainlib.cli.arg.ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG | self.flags.CHAIN_SPEC
process_args(ap, self.arg, flags)
args = ap.parse_args()
default_config_dir = os.path.join(config_dir, 'default')
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, default_config_dir=default_config_dir)
config = Config(default_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'baz:bar:13:foo')
user_config_dir = os.path.join(default_config_dir, 'user')
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, default_config_dir=default_config_dir, user_config_dir=user_config_dir)
config = Config(default_config_dir)
config.add_override_dir(user_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'foo:foo:666:foo')
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, default_config_dir=default_config_dir, user_config_dir=default_config_dir)
config = Config(default_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'baz:bar:13:foo')
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
process_args(ap, self.arg, flags)
argv = [
'-n',
'user',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, default_config_dir=default_config_dir, user_config_dir=default_config_dir)
config = Config(default_config_dir, namespace=args.namespace)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'foo:foo:666:foo')
def test_args_process_extra(self):
ap = chainlib.cli.arg.ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
ap.add_argument('--foo', type=str)
argv = [
'--foo',
'bar',
]
args = ap.parse_args(argv)
extra_args = {
'foo': None,
}
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, extra_args=extra_args)
self.assertEqual(config.get('_FOO'), 'bar')
extra_args = {
'foo': 'FOOFOO',
}
config = chainlib.cli.config.Config.from_args(args, arg_flags=flags, extra_args=extra_args)
self.assertEqual(config.get('FOOFOO'), 'bar')
# def test_args_process_extra(self):
# ap = ArgumentParser()
# flags = self.flags.VERBOSE | self.flags.CONFIG
# process_args(ap, self.arg, flags)
# ap.add_argument('--foo', type=str)
# argv = [
# '--foo',
# 'bar',
# ]
# args = ap.parse_args(argv)
# extra_args = {
# 'foo': None,
# }
#
# config = Config()
# config = process_config(config, self.arg, args, flags)
# self.assertEqual(config.get('_FOO'), 'bar')
#
# extra_args = {
# 'foo': 'FOOFOO',
# }
#
# config = Config()
# config = process_config(config, self.arg, args, flags)
# self.assertEqual(config.get('FOOFOO'), 'bar')
if __name__ == '__main__':