70 Commits

Author SHA1 Message Date
lash
f66f93d867 Add generic initializationerror 2022-05-14 12:23:06 +00:00
lash
9f537ff569 Add state settings and config 2022-05-13 13:44:15 +00:00
lash
e225b4e333 Conceal wait settings behind config check 2022-05-13 07:48:48 +00:00
lash
36bd98f3df Upgrade confini 2022-05-12 18:42:54 +00:00
lash
5e6db69630 Update deps 2022-05-12 18:30:05 +00:00
lash
3a1a74c66b Bump version 2022-05-12 18:21:49 +00:00
lash
7a13d25524 Add common value processing with aiee 2022-05-12 18:21:18 +00:00
lash
f5ab76e81a Move settings processing out of class scope 2022-05-12 13:49:23 +00:00
lash
bfed5843b4 Fix nonsensical no-target handling 2022-05-12 13:21:22 +00:00
lash
4d04840e3d Fix missing sign flag handling, default fee in chain config 2022-05-12 09:31:07 +00:00
lash
65830cebed Correct type for 'unsafe' flag 2022-05-12 08:19:54 +00:00
lash
91d9654052 Add fields to block object 2022-05-12 08:06:14 +00:00
lash
272bf43ba5 Add rpc timeout proxy args 2022-05-12 06:26:34 +00:00
lash
2ad84fc5aa Fix config process ordering, all args test 2022-05-11 18:53:00 +00:00
lash
98ecf91adc WIP factor out config processing 2022-05-11 18:25:46 +00:00
lash
c739652203 WIP factor out config processing 2022-05-11 18:20:45 +00:00
lash
88cf5500bf Register flag 2022-05-11 15:41:18 +00:00
lash
14460ed76e Implement cli arg handling on aiee 2022-05-11 15:24:59 +00:00
lash
63df6aa6c4 Fix wire generation bug 2022-05-10 19:00:37 +00:00
lash
11b7c4e4e8 Make status methods properties 2022-05-09 19:21:05 +00:00
lash
23ee2f4b09 Remove commented code 2022-05-09 18:48:49 +00:00
lash
883a931495 Add common tx result fields 2022-05-09 18:46:50 +00:00
lash
04a09334ea WIP implement and test genertic tx, result, block 2022-05-09 10:00:58 +00:00
lash
45785c8e9e Remove superfluous arguments for settings instantiation 2022-05-06 07:47:04 +00:00
lash
b002976ebd Upgrade hexathon 2022-05-04 18:13:30 +00:00
lash
766027a49c Add settings module 2022-04-28 12:31:11 +00:00
lash
5726181f21 Remove log output for config load by default 2022-04-26 21:36:14 +00:00
lash
a6b9eae745 Add rpc query timeout 2022-04-10 19:08:34 +00:00
lash
d3f4da5296 Upgrade confini 2022-03-06 19:29:57 +00:00
lash
f122e2a64b Implement no-target flag 2022-02-25 11:05:56 +00:00
lash
6900c4f9a1 Allow adding of new args through overrides in man generator 2022-02-24 19:00:28 +00:00
lash
91bd428c22 Add formatting options for cli 2022-02-24 15:51:06 +00:00
lash
1db7f3f8c8 Add examples and custom content inclusion 2022-02-24 12:08:18 +00:00
lash
9b0f900eb3 Automatically apply overrides if file exists, use source dir instead of head file explicitly in man generator 2022-02-24 11:22:54 +00:00
lash
fcd78b5ca8 Flags cleanup, remove persistent wallet 2022-02-24 10:35:52 +00:00
lash
9419fbb77b Add configuration override detail in options 2022-02-23 12:08:46 +00:00
lash
49a4e4b4b8 Add remaining man page contents 2022-02-23 10:32:44 +00:00
lash
af7f977007 Add env/config descriptions from ini dir 2022-02-23 09:48:15 +00:00
lash
92fb016014 WIP add environment variable support to man generator 2022-02-22 14:47:35 +00:00
lash
3691f75f81 Add heading file argument to generator script 2022-02-22 12:11:38 +00:00
lash
299b680748 Merge remote-tracking branch 'origin/lash/man' into lash/man 2022-02-21 23:46:21 +00:00
lash
393bbd2fc2 WIP Add last arg to man maps and specs, arg flags settable through cli 2022-02-21 23:43:57 +00:00
lash
8d962e1a27 WIP add groff generator 2022-02-21 23:31:35 +00:00
lash
d638ad93ae WIP add groff generator 2022-02-21 23:30:30 +00:00
lash
68a01565c9 Make certain long flag aliases editable 2022-02-21 08:12:24 +00:00
lash
b5a01b28ac Add confini dep 2022-02-20 19:00:15 +00:00
lash
926c954d6e Add wallet passphrase to censor list 2022-02-20 18:42:30 +00:00
lash
d24fea2d4f Passphrase file in wallet key file 2022-02-20 18:27:00 +00:00
lash
d3f6d1138d Skip buggy hexathon 2022-02-14 13:56:41 +00:00
lash
2c567cbded Remove unneeded change in -vv comment 2022-01-21 10:21:17 +00:00
lash
28ee4f7197 Add log disable and env var loglevel 2022-01-21 00:13:07 +00:00
nolash
eedb63212d Bump version 2022-01-04 17:14:24 +00:00
nolash
99cac32ced Allow for query string in rpc url 2022-01-04 17:10:19 +00:00
7deeee2c84 Merge pull request 'bug: Fix documentation compile and config dumps' (#2) from lash/docs-and-dumps into master
Reviewed-on: #2
2021-12-21 15:00:02 +00:00
nolash
6a94208c68 Remove bluto 2021-12-21 14:58:09 +00:00
nolash
becd6744f6 Add option to skip ssl validation on rpc 2021-12-06 18:55:36 +01:00
nolash
06c6b2562a Replace string with object in config export suppress 2021-11-10 11:14:02 +01:00
nolash
1067c8494e Add omit sections for config exporter 2021-11-10 09:56:51 +01:00
nolash
39478d9c3c Add bluto package info renderer 2021-11-08 07:49:02 +01:00
nolash
24d7c078e8 Remove prerelease tag 2021-11-06 15:30:03 +01:00
nolash
70d8ab6a33 Add signer initializer 2021-10-30 14:10:19 +02:00
nolash
d914f64797 Remove logline span 2021-10-28 13:27:30 +02:00
nolash
5bfdb51676 Add custom fields to chain spec, harden custom field imports, improve dumpconfig handle 2021-10-28 12:16:23 +02:00
nolash
7d7209dd31 Handle missing dumpconfig arg 2021-10-27 20:53:07 +02:00
nolash
1373cb10e6 Remove unnecessary nodes 2021-10-26 19:44:54 +02:00
nolash
64dbcfcddf WIP explicit node links in texinfo docs 2021-10-26 18:20:36 +02:00
nolash
56f3cffbbf Merge remote-tracking branch 'origin/master' into lash/fix-docs-compile 2021-10-26 18:08:40 +02:00
Louis Holbrook
a71ec59fb2 Merge branch 'lash/fix-docs-compile' into 'master'
feat: chainspec, positional args, docfix

See merge request chaintool/chainlib!8
2021-10-26 14:16:00 +00:00
Louis Holbrook
095c0810f3 feat: chainspec, positional args, docfix 2021-10-26 14:15:59 +00:00
nolash
3e6d0c6912 Rename index texi file 2021-10-26 15:37:08 +02:00
30 changed files with 1313 additions and 502 deletions

View File

@@ -1,14 +1,51 @@
- 0.0.5-pending
- 0.3.0
* Implement arg handling from aiee
* Implement value argument handling from aiee
- 0.2.1
* Fix bug in wire format generation for tx
- 0.2.0
* Consolidate genertic blcok, tx and tx result objects
- 0.1.3
* Remove superfluous arguments for chain settings instantiation
- 0.1.2
* Upgrade hexathon dep
- 0.1.1
* Add settings object
- 0.1.0
* Upgrade deps
- 0.0.23
* Configuration variable descriptions
* Arg flags to names listing method
* New flags preset for reads without wallet
* Remove pesistent wallet arg flag bug
* Rename arg flag reset to flag_reset
- 0.0.22
* Man page generator script
- 0.0.21
* Log rpc reply before json parse
- 0.0.20
* Add edit option for some long flag aliases of short flags: -a, -e, -s and -y
- 0.0.19
* Passphrase file option to unlock keyfile for CLI tooling
- 0.0.18
* Upgrade hexathon skipping buggy compact hex method
- 0.0.17
* Add loglevel environment variable
- 0.0.16
* Allow query string in query url
- 0.0.14
* Add option to skip ssl verification on rpc
- 0.0.5
* Move eth code to separate package
- 0.0.4-unreleased
- 0.0.4
* Add pack tx from already signed tx struct
* Add http auth handling for jsonrpc connections
* Add customizable jsonrpc id generator (to allow for buggy server id handling)
- 0.0.3-unreleased
- 0.0.3
* Remove erc20 module (to new external package)
- 0.0.2-unreleased
- 0.0.2
*
- 0.0.1-unreleased
- 0.0.1
* Add eth tx decode
* Add eth balance query with erc20 option
* Add eth checksum address

View File

@@ -1 +1 @@
include *requirements.txt LICENSE chainlib/data/config/*
include *requirements.txt LICENSE chainlib/data/config/* chainlib/data/env/*

View File

@@ -2,7 +2,8 @@
import enum
# local imports
from chainlib.tx import Tx
from .tx import Tx
from .src import Src
class BlockSpec(enum.IntEnum):
@@ -12,23 +13,28 @@ class BlockSpec(enum.IntEnum):
LATEST = 0
class Block:
class Block(Src):
"""Base class to extend for implementation specific block object.
"""
tx_generator = Tx
def __init__(self, src=None):
self.number = None
self.txs = []
self.author = None
def src(self):
"""Return implementation specific block representation.
self.get_tx = self.tx_index_by_hash
self.tx = self.tx_by_index
:rtype: dict
:returns: Block representation
"""
return self.block_src
self.fee_limit = 0
self.fee_cost = 0
self.parent_hash = None
super(Block, self).__init__(src=src)
def tx(self, idx):
def tx_by_index(self, idx):
"""Return transaction object for transaction data at given index.
:param idx: Transaction index
@@ -39,28 +45,12 @@ class Block:
return self.tx_generator(self.txs[idx], self)
def tx_src(self, idx):
"""Return implementation specific transaction representation for transaction data at given index
:param idx: Transaction index
:type idx: int
:rtype: chainlib.tx.Tx
:returns: Transaction representation
"""
return self.txs[idx]
def tx_index_by_hash(self, hsh):
for tx in self.tx:
if tx == hsh:
return tx
return -1
def __str__(self):
return 'block {} {} ({} txs)'.format(self.number, self.hash, len(self.txs))
@classmethod
def from_src(cls, src):
"""Instantiate an implementation specific block object from the given block representation.
:param src: Block representation
:type src: dict
:rtype: chainlib.block.Block
:returns: Block object
"""
return cls(src)

View File

@@ -1,5 +1,15 @@
# standard imports
import copy
import re
def is_valid_label(v, alpha_only=False):
re_m = None
if alpha_only:
re_m = r'^[a-zA-Z]+$'
else:
re_m = r'^[a-zA-Z0-9]+$'
return re.match(re_m, v)
class ChainSpec:
@@ -16,13 +26,37 @@ class ChainSpec:
:param tag: Descriptive tag
:type tag: str
"""
def __init__(self, arch, fork, network_id, common_name=None):
def __init__(self, arch, fork, network_id, common_name=None, custom=[], safe=True):
if custom == None:
custom = []
elif not isinstance(custom, list):
raise ValueError('custom value must be list')
self.o = {
'arch': arch,
'fork': fork,
'network_id': network_id,
'common_name': common_name,
}
'arch': arch,
'fork': fork,
'network_id': network_id,
'common_name': common_name,
'custom': custom,
}
if safe:
self.validate()
def validate(self):
self.o['network_id'] = int(self.o['network_id'])
if not is_valid_label(self.o['arch'], alpha_only=True):
raise ValueError('arch: ' + self.o['arch'])
if not is_valid_label(self.o['fork'], alpha_only=True):
raise ValueError('fork: ' + self.o['fork'])
if self.o.get('common_name') and not is_valid_label(self.o['common_name']):
raise ValueError('common_name: ' + self.o['common_name'])
if self.o.get('custom'):
for i, v in enumerate(self.o['custom']):
if not is_valid_label(v):
raise ValueError('common_name {}: {}'.format(i, v))
def network_id(self):
"""Returns the network id part of the spec.
@@ -43,6 +77,12 @@ class ChainSpec:
def engine(self):
"""Alias of self.arch()
"""
return self.arch()
def arch(self):
"""Returns the chain architecture part of the spec
:rtype: str
@@ -51,6 +91,15 @@ class ChainSpec:
return self.o['arch']
def fork(self):
"""Returns the fork part of the spec
:rtype: str
:returns: fork
"""
return self.o['fork']
def common_name(self):
"""Returns the common name part of the spec
@@ -60,6 +109,21 @@ class ChainSpec:
return self.o['common_name']
def is_same_as(self, chain_spec_cmp, use_common_name=False, use_custom=False):
a = ['arch', 'fork', 'network_id']
if use_common_name:
a += ['common_name']
if use_custom:
a += ['custom']
try:
for k in a:
assert(chain_spec_cmp.o[k] == self.o[k])
except AssertionError:
return False
return True
@staticmethod
def from_chain_str(chain_str):
"""Create a new ChainSpec object from a colon-separated string, as output by the string representation of the ChainSpec object.
@@ -79,9 +143,13 @@ class ChainSpec:
if len(o) < 3:
raise ValueError('Chain string must have three sections, got {}'.format(len(o)))
common_name = None
if len(o) == 4:
if len(o) > 3:
common_name = o[3]
return ChainSpec(o[0], o[1], int(o[2]), common_name)
custom = []
if len(o) > 4:
for i in range(4, len(o)):
custom.append(o[i])
return ChainSpec(o[0], o[1], int(o[2]), common_name=common_name, custom=custom)
@staticmethod
@@ -100,20 +168,35 @@ class ChainSpec:
:rtype: chainlib.chain.ChainSpec
:returns: Resulting chain spec
"""
return ChainSpec(o['arch'], o['fork'], o['network_id'], common_name=o['common_name'])
return ChainSpec(o['arch'], o['fork'], o['network_id'], common_name=o.get('common_name'), custom=o.get('custom'))
def asdict(self):
def asdict(self, use_common_name=True, use_custom=True):
"""Create a dictionary representation of the chain spec.
:rtype: dict
:returns: Chain spec dictionary
"""
return copy.copy(self.o)
r = copy.copy(self.o)
if not use_common_name:
del r['common_name']
del r['custom']
if not use_custom:
del r['custom']
return r
def as_string(self, skip_optional=False):
s = '{}:{}:{}'.format(self.o['arch'], self.o['fork'], self.o['network_id'])
if skip_optional:
return s
if self.o.get('common_name'):
s += ':' + self.o['common_name']
if self.o.get('custom'):
s += ':' + ':'.join(self.o['custom'])
return s
def __str__(self):
s = '{}:{}:{}'.format(self.o['arch'], self.o['fork'], self.o['network_id'])
if self.o['common_name'] != None:
s += ':' + self.o['common_name']
return s
return self.as_string()

View File

@@ -3,7 +3,9 @@ from .base import (
argflag_std_read,
argflag_std_write,
argflag_std_base,
reset,
argflag_std_base_read,
flag_reset,
flag_set,
)
from .arg import ArgumentParser
from .config import Config

View File

@@ -1,182 +1,143 @@
# standard imports
import logging
import argparse
import enum
import os
import select
#import enum
#import os
#import select
import sys
#import re
# local imports
from .base import (
default_config_dir,
Flag,
argflag_std_target,
# external imports
from aiee.arg import (
ArgFlag as BaseArgFlag,
Arg as BaseArg,
process_args,
)
logg = logging.getLogger(__name__)
def stdin_arg():
"""Retreive input arguments from stdin if they exist.
Method does not block, and expects arguments to be ready on stdin before being called.
:rtype: str
:returns: Input arguments string
"""
h = select.select([sys.stdin], [], [], 0)
if len(h[0]) > 0:
v = h[0][0].read()
return v.rstrip()
return None
#def stdin_arg():
# """Retreive input arguments from stdin if they exist.
#
# Method does not block, and expects arguments to be ready on stdin before being called.
#
# :rtype: str
# :returns: Input arguments string
# """
# h = select.select([sys.stdin], [], [], 0)
# if len(h[0]) > 0:
# v = h[0][0].read()
# return v.rstrip()
# return None
class ArgumentParser(argparse.ArgumentParser):
"""Extends the standard library argument parser to construct arguments based on configuration flags.
The extended class is set up to facilitate piping of single positional arguments via stdin. For this reason, positional arguments should be added using the locally defined add_positional method instead of add_argument.
Calls chainlib.cli.args.ArgumentParser.process_flags with arg_flags and env arguments, see the method's documentation for further details.
:param arg_flags: Argument flag bit vector to generate configuration values for.
:type arg_flags: chainlib.cli.Flag
:param env: Environment variables
:type env: dict
:param usage: Usage string, passed to parent
:type usage: str
:param description: Description string, passed to parent
:type description: str
:param epilog: Epilog string, passed to parent
:type epilog: str
"""
def __init__(self, arg_flags=0x0f, env=os.environ, usage=None, description=None, epilog=None, *args, **kwargs):
super(ArgumentParser, self).__init__(usage=usage, description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter, *args, **kwargs)
self.process_flags(arg_flags, env)
self.pos_args = []
def add_positional(self, name, type=str, help=None, append=False, required=True):
"""Add a positional argument.
Stdin piping will only be possible in the event a single positional argument is defined.
If the "required" is set, the resulting parsed arguments must have provided a value either from stdin or excplicitly on the command line.
:param name: Attribute name of argument
:type name: str
:param type: Argument type
:type type: str
:param help: Help string
:type help: str
:param required: If true, argument will be set to required
:type required: bool
"""
self.pos_args.append((name, type, help, required, append,))
def parse_args(self, argv=sys.argv[1:]):
"""Overrides the argparse.ArgumentParser.parse_args method.
Implements reading arguments from stdin if a single positional argument is defined (and not set to required).
If the "required" was set for the single positional argument, the resulting parsed arguments must have provided a value either from stdin or excplicitly on the command line.
:param argv: Argument vector to process
:type argv: list
"""
if len(self.pos_args) == 1:
arg = self.pos_args[0]
if arg[4]:
self.add_argument(arg[0], nargs='*', type=arg[1], default=stdin_arg(), help=arg[2])
else:
self.add_argument(arg[0], nargs='?', type=arg[1], default=stdin_arg(), help=arg[2])
else:
for arg in self.pos_args:
if arg[3]:
if arg[4]:
logg.debug('argumen')
self.add_argument(arg[0], nargs='+', type=arg[1], help=arg[2])
else:
self.add_argument(arg[0], type=arg[1], help=arg[2])
else:
if arg[4]:
self.add_argument(arg[0], nargs='*', type=arg[1], help=arg[2])
else:
self.add_argument(arg[0], type=arg[1], help=arg[2])
args = super(ArgumentParser, self).parse_args(args=argv)
if args.dumpconfig:
return args
if len(self.pos_args) == 1:
arg = self.pos_args[0]
argname = arg[0]
required = arg[3]
if getattr(args, arg[0], None) == None:
argp = stdin_arg()
if argp == None and required:
self.error('need first positional argument or value from stdin')
setattr(args, arg[0], argp)
return args
if '--dumpconfig' in argv:
argv = [argv[0], '--dumpconfig']
return super(ArgumentParser, self).parse_args(args=argv)
def process_flags(self, arg_flags, env):
"""Configures the arguments of the parser using the provided flags.
class ArgFlag(BaseArgFlag):
Environment variables are used for default values for:
def __init__(self):
super(ArgFlag, self).__init__()
CONFINI_DIR: -c, --config
CONFINI_ENV_PREFIX: --env-prefix
self.add('verbose')
self.add('config')
self.add('raw')
self.add('env')
self.add('provider')
self.add('chain_spec')
self.add('target')
self.add('unsafe')
self.add('seq')
self.add('key_file')
self.add('fee')
self.add('nonce')
self.add('no_target')
self.add('exec')
self.add('wallet')
self.add('wait')
self.add('wait_all')
self.add('send')
self.add('rpc_auth')
self.add('fmt_human')
self.add('fmt_wire')
self.add('fmt_rpc')
self.add('veryverbose')
self.add('path')
self.add('backend')
self.alias('sign', 'key_file', 'send')
self.alias('std_base', 'verbose', 'config', 'raw', 'env', 'target')
self.alias('std_base_read', 'verbose', 'config', 'raw', 'env', 'provider', 'chain_spec', 'seq')
self.alias('std_read', 'std_base', 'provider', 'chain_spec', 'unsafe', 'seq', 'sign', 'fee', 'target')
self.alias('std_write', 'verbose', 'config', 'raw', 'env', 'provider', 'chain_spec', 'unsafe', 'seq', 'key_file', 'sign', 'target', 'wait', 'wait_all', 'send', 'rpc_auth', 'nonce', 'fee')
self.alias('std_target', 'no_target', 'exec', 'wallet')
self.alias('state', 'backend', 'path')
class Arg(BaseArg):
def __init__(self, flags):
super(Arg, self).__init__(flags)
This method is called by the constructor, and is not intended to be called directly.
self.add_long('no-logs', 'verbose', typ=bool, help='Turn off all logging')
self.add('v', 'verbose', typ=bool, help='Be verbose')
self.add('vv', 'verbose', check=False, typ=bool, help='Be more verbose')
self.add('vvv', 'veryverbose', check=False, typ=bool, help='Be morse verbose with custom tracing')
:param arg_flags: Argument flag bit vector to generate configuration values for.
:type arg_flags: chainlib.cli.Flag
:param env: Environment variables
:type env: dict
"""
if arg_flags & Flag.VERBOSE:
self.add_argument('-v', action='store_true', help='Be verbose')
self.add_argument('-vv', action='store_true', help='Be more verbose')
if arg_flags & Flag.CONFIG:
self.add_argument('-c', '--config', type=str, default=env.get('CONFINI_DIR'), help='Configuration directory')
self.add_argument('-n', '--namespace', type=str, help='Configuration namespace')
self.add_argument('--dumpconfig', action='store_true', help='Output configuration and quit. Use with --raw to omit values and output schema only.')
if arg_flags & Flag.WAIT:
self.add_argument('-w', action='store_true', help='Wait for the last transaction to be confirmed')
self.add_argument('-ww', action='store_true', help='Wait for every transaction to be confirmed')
if arg_flags & Flag.ENV_PREFIX:
self.add_argument('--env-prefix', default=env.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
if arg_flags & Flag.PROVIDER:
self.add_argument('-p', '--rpc-provider', dest='p', type=str, help='RPC HTTP(S) provider url')
self.add_argument('--rpc-dialect', dest='rpc_dialect', type=str, help='RPC HTTP(S) backend dialect')
self.add_argument('--height', default='latest', help='Block height to execute against')
if arg_flags & Flag.RPC_AUTH:
self.add_argument('--rpc-auth', dest='rpc_auth', type=str, help='RPC autentication scheme')
self.add_argument('--rpc-credentials', dest='rpc_credentials', type=str, help='RPC autentication credential values')
if arg_flags & Flag.CHAIN_SPEC:
self.add_argument('-i', '--chain-spec', dest='i', type=str, help='Chain specification string')
if arg_flags & Flag.UNSAFE:
self.add_argument('-u', '--unsafe', dest='u', action='store_true', help='Do not verify address checksums')
if arg_flags & Flag.SEQ:
self.add_argument('--seq', action='store_true', help='Use sequential rpc ids')
if arg_flags & Flag.KEY_FILE:
self.add_argument('-y', '--key-file', dest='y', type=str, help='Keystore file to use for signing or address')
if arg_flags & Flag.SEND:
self.add_argument('-s', '--send', dest='s', action='store_true', help='Send to network')
if arg_flags & Flag.RAW:
self.add_argument('--raw', action='store_true', help='Do not decode output')
if arg_flags & (Flag.SIGN | Flag.NONCE):
self.add_argument('--nonce', type=int, help='override nonce')
if arg_flags & (Flag.SIGN | Flag.FEE):
self.add_argument('--fee-price', dest='fee_price', type=int, help='override fee price')
self.add_argument('--fee-limit', dest='fee_limit', type=int, help='override fee limit')
if arg_flags & argflag_std_target == 0:
arg_flags |= Flag.WALLET
if arg_flags & Flag.EXEC:
self.add_argument('-e', '--exectuable-address', dest='executable_address', type=str, help='contract address')
if arg_flags & Flag.WALLET:
self.add_argument('-a', '--recipient', dest='recipient', type=str, help='recipient address')
self.add('n', 'config', help='Configuration namespace')
self.set_long('n', 'namespace', dest='namespace')
self.add('c', 'config', dest='config', help='Configuration directory')
self.set_long('c', 'config')
self.add_long('dumpconfig', 'config', help='Output configuration and quit. Use with --raw to omit values and output schema only.')
self.add('a', 'wallet', dest='recipient', help='Recipient address')
self.set_long('a', 'recipient')
self.add('e', 'exec', dest='executable_address', help='Recipient address')
self.set_long('e', 'executable')
self.add('w', 'wait', typ=bool, help='Wait for the last transaction to be confirmed')
self.add('ww', 'wait', check=False, typ=bool, help='Wait for every transaction to be confirmed')
self.add_long('env-prefix', 'env', help='environment prefix for variables to overwrite configuration')
self.add('p', 'provider', help='RPC HTTP(S) provider url')
self.set_long('p', 'provider')
self.add_long('rpc-dialect', 'provider', help='RPC HTTP(S) backend dialect')
self.add_long('rpc-timeout', 'provider', help='RPC autentication credential values')
self.add_long('rpc-proxy', 'provider', help='RPC autentication credential values')
self.add_long('height', 'target', default='latest', help='Block height to execute against')
self.add_long('rpc-auth', 'rpc_auth', help='RPC autentication scheme')
self.add_long('rpc-credentials', 'rpc_auth', help='RPC autentication credential values')
self.add('i', 'chain_spec', help='Chain specification string')
self.set_long('i', 'chain-spec')
self.add('u', 'unsafe', typ=bool, help='Do not verify address checksums')
self.set_long('u', 'unsafe')
self.add_long('seq', 'seq', typ=bool, help='Use sequential rpc ids')
self.add('y', 'key_file', help='Keystore file to use for signing or address')
self.set_long('y', 'key_file')
self.add_long('passphrase-file', 'key_file', help='Keystore file to use for signing or address')
self.add('s', 'send', typ=bool, help='Send to network')
self.set_long('s', 'send')
self.add_long('raw', 'raw', typ=bool, help='Do not decode output')
self.add('0', 'raw', typ=bool, help='Omit newline to output')
self.add_long('nonce', 'nonce', typ=int, help='override nonce')
self.add_long('fee-price', 'fee', typ=int, help='override fee price')
self.add_long('fee-limit', 'fee', typ=int, help='override fee limit')
self.add_long('state-path', 'path', help='Path to store state data under')
self.add_long('runtime-path', 'path', help='Path to store volatile data under')
self.add_long('backend', 'backend', help='Backend to use for data storage')

View File

@@ -33,14 +33,40 @@ class Flag(enum.IntEnum):
SEND = 262144
# rpc extras - nibble 6
RPC_AUTH = 1048576
# formatting - nibble 7
FMT_HUMAN = 16777216
FMT_WIRE = 33554432
FMT_RPC = 67108864
# upper bound
MAX = 1048576
argflag_std_read = 0x23ff
argflag_std_write = 0xff31ff
argflag_std_base = 0x200f
argflag_std_target = 0x00e000
argflag_all = 0xffffff
argflag_std_read = 0x000023ff
argflag_std_write = 0x001731ff
argflag_std_base = 0x0000200f
argflag_std_base_read = 0x000000bf
argflag_std_target = 0x0000e000
argflag_all = 0x0317f7ff
def reset(flags, v):
def flag_reset(flags, v):
mask = ~(argflag_all & v)
r = flags & mask
return r
def flag_set(flags, v):
return flags | v
def flag_names(flags):
flags_debug = []
i = Flag.MAX
while True:
if flags & i > 0:
v = Flag(i)
flags_debug.append(v.name)
i >>= 1
if i == 0:
break
flags_debug.reverse()
return flags_debug

View File

@@ -2,6 +2,7 @@
import logging
import os
import sys
import stat
# external imports
import confini
@@ -35,224 +36,102 @@ class Config(confini.Config):
default_base_config_dir = default_parent_config_dir
default_fee_limit = 0
@staticmethod
def override_defaults(base_dir=None, default_fee_limit=None):
if base_dir != None:
Config.default_base_config_dir = os.path.realpath(base_dir)
if default_fee_limit != None:
Config.default_fee_limit = int(default_fee_limit)
def __init__(self, config_dir=None, namespace=None):
self.namespace = namespace
if config_dir == None:
config_dir = self.default_base_config_dir
if self.namespace != None:
config_dir = os.path.join(config_dir, namespace)
super(Config, self).__init__(config_dir)
@classmethod
def from_args(cls, args, arg_flags=0x0f, env=os.environ, extra_args={}, base_config_dir=None, default_config_dir=None, user_config_dir=None, default_fee_limit=None, logger=None, load_callback=logcallback, dump_writer=sys.stdout):
"""Parses arguments in argparse.ArgumentParser instance, then match and override configuration values that match them.
def add_user_dir(self, v):
if self.namespace != None:
v = os.path.join(v, self.namespace)
return super(Config, self).add_override_dir(v)
The method processes all known argument flags from chainlib.cli.Flag passed in the "args" argument.
All entries in extra_args may be used to associate arguments not defined in the argument flags with configuration variables, in the following manner:
def process_config(config, arg, args, flags):
- The value of argparser.ArgumentParser instance attribute with the dictionary key string is looked up.
- If the value is None (defined but empty), any existing value for the configuration directive will be kept.
- If the value of the extra_args dictionary entry is None, then the value will be stored in the configuration under the upper-case value of the key string, prefixed with "_" ("foo_bar" becomes "_FOO_BAR")
- If the value of the extra_args dictionary entries is a string, then the value will be stored in the configuration under that literal string.
if arg.match('env', flags):
config.set_env_prefix(getattr(args, 'env_prefix'))
config.process()
args_override = {}
if arg.match('raw', flags):
config.add(getattr(args, 'raw', None), '_RAW')
Missing attributes defined by both the "args" and "extra_args" arguments will both raise an AttributeError.
if arg.match('provider', flags):
args_override['RPC_PROVIDER'] = getattr(args, 'p')
args_override['RPC_DIALECT'] = getattr(args, 'rpc_dialect')
The python package "confini" is used to process and render the configuration.
if arg.match('chain_spec', flags):
args_override['CHAIN_SPEC'] = getattr(args, 'i')
The confini config schema is determined in the following manner:
if arg.match('config', flags):
config.add(getattr(args, 'namespace', None), 'CONFIG_USER_NAMESPACE')
- If nothing is set, only the config folder in chainlib.data.config will be used as schema.
- If base_config_dir is a string or list, the config directives from the path(s) will be added to the schema.
if arg.match('key_file', flags):
args_override['WALLET_KEY_FILE'] = getattr(args, 'y')
fp = getattr(args, 'passphrase_file')
if fp != None:
st = os.stat(fp)
if stat.S_IMODE(st.st_mode) & (stat.S_IRWXO | stat.S_IRWXG) > 0:
logg.warning('others than owner have access on password file')
f = open(fp, 'r')
args_override['WALLET_PASSPHRASE'] = f.read()
f.close()
config.censor('PASSPHRASE', 'WALLET')
The global override config directories are determined in the following manner:
if arg.match('backend', flags):
args_override['STATE_BACKEND'] = getattr(args, 'backend')
- If no default_config_dir is defined, the environment variable CONFINI_DIR will be used.
- If default_config_dir is a string or list, values from the config directives from the path(s) will override those defined in the schema(s).
if arg.match('path', flags):
args_override['STATE_PATH'] = getattr(args, 'state_path')
The user override config directories work the same way as the global ones, but the namespace - if defined - are dependent on them. They are only applied if the CONFIG arg flag is set. User override config directories are determined in the following manner:
config.dict_override(args_override, 'cli args', allow_empty=True)
- If --config argument is not defined and the pyxdg module is present, the first available xdg basedir is used.
- If --config argument is defined, the directory defined by its value will be used.
The namespace, if defined, will be stored under the CONFIG_USER_NAMESPACE configuration key.
:param args: Argument parser object
:type args: argparse.ArgumentParser
:param arg_flags: Argument flags defining which arguments to process into configuration.
:type arg_flags: confini.cli.args.ArgumentParser
:param env: Environment variables selection
:type env: dict
:param extra_args: Extra arguments to process and override.
:type extra_args: dict
:param base_config_dir: Path(s) to one or more directories extending the base chainlib config schema.
:type base_config_dir: list or str
:param default_config_dir: Path(s) to one or more directories overriding the defaults defined in the schema config directories.
:type default_config_dir: list or str
:param user_config_dir: User xdg config basedir, with namespace
:type user_config_dir: str
:param default_fee_limit: Default value for fee limit argument
:type default_fee_limit: int
:param logger: Logger instance to use during argument processing (will use package namespace logger if None)
:type logger: logging.Logger
:param load_callback: Callback receiving config instance as argument after config processing and load completes.
:type load_callback: function
:raises AttributeError: Attribute defined in flag not found in parsed arguments
:rtype: confini.Config
:return: Processed configuation
"""
if logger == None:
logger = logging.getLogger()
if arg_flags & Flag.CONFIG:
if args.vv:
logger.setLevel(logging.DEBUG)
elif args.v:
logger.setLevel(logging.INFO)
override_config_dirs = []
config_dir = [cls.default_base_config_dir]
if user_config_dir == None:
try:
import xdg.BaseDirectory
user_config_dir = xdg.BaseDirectory.load_first_config('chainlib/eth')
except ModuleNotFoundError:
pass
# if one or more additional base dirs are defined, add these after the default base dir
# the consecutive dirs cannot include duplicate sections
if base_config_dir != None:
logg.debug('have explicit base config addition {}'.format(base_config_dir))
if isinstance(base_config_dir, str):
base_config_dir = [base_config_dir]
for d in base_config_dir:
config_dir.append(d)
logg.debug('processing config dir {}'.format(config_dir))
# confini dir env var will be used for override configs only in this case
if default_config_dir == None:
default_config_dir = env.get('CONFINI_DIR')
if default_config_dir != None:
if isinstance(default_config_dir, str):
default_config_dir = [default_config_dir]
for d in default_config_dir:
override_config_dirs.append(d)
# process config command line arguments
if arg_flags & Flag.CONFIG:
effective_user_config_dir = getattr(args, 'config', None)
if effective_user_config_dir == None:
effective_user_config_dir = user_config_dir
if effective_user_config_dir != None:
if getattr(args, 'namespace', None) != None:
effective_user_config_dir = os.path.join(effective_user_config_dir, args.namespace)
#if config_dir == None:
# config_dir = [cls.default_base_config_dir, effective_user_config_dir]
# logg.debug('using config arg as base config addition {}'.format(effective_user_config_dir))
#else:
override_config_dirs.append(effective_user_config_dir)
logg.debug('using config arg as config override {}'.format(effective_user_config_dir))
#if config_dir == None:
# if default_config_dir == None:
# default_config_dir = default_parent_config_dir
# config_dir = default_config_dir
# override_config_dirs = []
env_prefix = getattr(args, 'env_prefix', None)
config = confini.Config(config_dir, env_prefix=env_prefix, override_dirs=override_config_dirs)
config.process()
config.add(getattr(args, 'raw'), '_RAW')
args_override = {}
if arg_flags & Flag.PROVIDER:
args_override['RPC_HTTP_PROVIDER'] = getattr(args, 'p')
args_override['RPC_PROVIDER'] = getattr(args, 'p')
args_override['RPC_DIALECT'] = getattr(args, 'rpc_dialect')
if arg_flags & Flag.CHAIN_SPEC:
args_override['CHAIN_SPEC'] = getattr(args, 'i')
if arg_flags & Flag.KEY_FILE:
args_override['WALLET_KEY_FILE'] = getattr(args, 'y')
config.dict_override(args_override, 'cli args')
if arg_flags & Flag.PROVIDER:
if arg.match('provider', flags):
if arg.match('target', flags):
config.add(getattr(args, 'height'), '_HEIGHT')
if arg_flags & Flag.UNSAFE:
config.add(getattr(args, 'u'), '_UNSAFE')
if arg_flags & (Flag.SIGN | Flag.FEE):
if arg.match('unsafe', flags):
config.add(getattr(args, 'u'), '_UNSAFE')
if arg.match('sign', flags):
config.add(getattr(args, 's'), '_RPC_SEND')
if arg.match('fee', flags):
config.add(getattr(args, 'fee_price'), '_FEE_PRICE')
fee_limit = getattr(args, 'fee_limit')
if fee_limit == None:
fee_limit = default_fee_limit
if fee_limit == None:
fee_limit = cls.default_fee_limit
fee_limit = int(config.get('CHAIN_MIN_FEE'))
config.add(fee_limit, '_FEE_LIMIT')
if arg_flags & (Flag.SIGN | Flag.NONCE):
if arg.match('nonce', flags):
config.add(getattr(args, 'nonce'), '_NONCE')
if arg_flags & Flag.SIGN:
config.add(getattr(args, 's'), '_RPC_SEND')
# handle wait
wait = 0
if args.w:
wait |= Flag.WAIT
if arg.match('wait', flags):
if args.ww:
wait |= Flag.WAIT_ALL
wait_last = wait & (Flag.WAIT | Flag.WAIT_ALL)
config.add(bool(wait_last), '_WAIT')
wait_all = wait & Flag.WAIT_ALL
config.add(bool(wait_all), '_WAIT_ALL')
config.add(True, '_WAIT_ALL')
config.add(True, '_WAIT')
elif args.w:
config.add(True, '_WAIT')
if arg.match('seq', flags):
config.add(getattr(args, 'seq'), '_SEQ')
if arg.match('wallet', flags):
config.add(getattr(args, 'recipient'), '_RECIPIENT')
if arg.match('exec', flags):
config.add(getattr(args, 'executable_address'), '_EXEC_ADDRESS')
if arg.match('rpc_auth', flags):
config.add(getattr(args, 'rpc_auth'), 'RPC_AUTH')
config.add(getattr(args, 'rpc_credentials'), 'RPC_CREDENTIALS')
if arg_flags & Flag.SEQ:
config.add(getattr(args, 'seq'), '_SEQ')
if arg_flags & Flag.WALLET:
config.add(getattr(args, 'recipient'), '_RECIPIENT')
if arg_flags & Flag.EXEC:
config.add(getattr(args, 'executable_address'), '_EXEC_ADDRESS')
if arg_flags & Flag.CONFIG:
config.add(getattr(args, 'namespace'), 'CONFIG_USER_NAMESPACE')
if arg_flags & Flag.RPC_AUTH:
config.add(getattr(args, 'rpc_auth'), 'RPC_AUTH')
config.add(getattr(args, 'rpc_credentials'), 'RPC_CREDENTIALS')
for k in extra_args.keys():
v = extra_args[k]
if v == None:
v = '_' + k.upper()
r = getattr(args, k)
existing_r = None
try:
existing_r = config.get(v)
except KeyError:
pass
if existing_r == None or r != None:
config.add(r, v, exists_ok=True)
if getattr(args, 'dumpconfig'):
config_keys = config.all()
with_values = not config.get('_RAW')
for k in config_keys:
if k[0] == '_':
continue
s = k + '='
if with_values:
v = config.get(k)
if v != None:
s += str(v)
s += '\n'
dump_writer.write(s)
sys.exit(0)
if load_callback != None:
load_callback(config)
return config
return config

357
chainlib/cli/man.py Normal file
View File

@@ -0,0 +1,357 @@
# standard imports
import os
# external imports
import confini
# local imports
from .base import (
Flag,
argflag_std_target,
)
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data')
def apply_groff(collection, v, arg=None, typ='arg'):
s = ''
for flag in collection:
if len(s) > 0:
s += ', '
s += format_groff(flag, v, arg=arg, typ=typ)
s = "\n.TP\n" + s + "\n" + v
return s
def format_groff(k, v, arg=None, typ='arg'):
s = ''
if typ == 'env':
s += '\\fI'
else:
s += '\\fB'
s += k
if arg != None:
s += ' \\fI' + arg
s += '\\fP'
return s
class DocEntry:
def __init__(self, *args, typ='arg'):
self.flags = args
self.render = self.get_empty
self.groff = None
self.typ = typ
self.v = None
def __check_line_default(self, m):
if self.render == self.get_empty:
self.render = m
def get_empty(self):
s = ''
for flag in self.flags:
if len(s) > 0:
s += ', '
s += flag
s += '\n\t(undefined)\n'
return s
def set_groff(self, v):
self.__check_line_default(self.get_groff)
self.groff = v
def set_groff_argvalue(self, argvalue):
self.v = '\\fI' + argvalue + '\\fP'
def get_groff(self):
v = self.groff
if v == None:
v = self.plain
s = apply_groff(self.flags, v, arg=self.v, typ=self.typ)
return s
def __str__(self):
return self.render()
class DocGenerator:
# def __init__(self, arg_flags, config):
def __init__(self, arg_flags):
#self.config = config
self.arg_flags = arg_flags
self.docs = {}
self.envs = {}
# self.envs = {}
def __str__(self):
s = ''
ks = list(self.docs.keys())
ks.sort()
for k in ks:
s += str(self.docs[k])
env = self.envs.get(k)
if env != None:
s += ' Overrides the \\fI' + env + '\\fP configuration setting.'
s += "\n"
return s
def get_args(self):
s = ''
ks = list(self.docs.keys())
ks.sort()
for k in ks:
s += str(self.docs[k]) + "\n"
return s
def set_arg(self, k, v, flags, argvalue=None):
o = DocEntry(*flags)
o.set_groff_argvalue(argvalue)
o.set_groff(v)
self.docs[k] = o
def override_arg(self, k, v, args, argvalue=None):
o = self.docs[k]
#g.docs[v[0]].groff = v[1].rstrip()
o.set_groff(v)
if argvalue != None:
o.set_groff_argvalue(argvalue)
l = len(args)
if l > 0:
o.flags = []
for i in range(l):
o.flags.append(args[i])
#
# def process_env(self):
# for k in self.config.all():
# if k[0] == '_':
# continue
# self.envs[k] = None
def process_arg(self):
if self.arg_flags & Flag.VERBOSE:
o = DocEntry('--no-logs')
o.set_groff('Turn of logging completely. Negates \\fB-v\\fP and \\fB-vv\\fP')
self.docs['nologs'] = o
o = DocEntry('-v')
o.set_groff('Verbose. Show logs for important state changes.')
self.docs['v'] = o
o = DocEntry('-vv')
o.set_groff('Very verbose. Show logs with debugging information.')
self.docs['vv'] = o
if self.arg_flags & Flag.CONFIG:
o = DocEntry('-c', '--config')
o.set_groff('Load configuration files from given directory. All files with an .ini extension will be loaded, of which all must contain valid ini file data.')
o.set_groff_argvalue('config_dir')
self.docs['c'] = o
o = DocEntry('-n', '--namespace')
o.set_groff('Load given configuration namespace. Configuration will be loaded from the immediate configuration subdirectory with the same name.')
o.set_groff_argvalue('namespace')
self.docs['n'] = o
o = DocEntry('--dumpconfig')
o.set_groff('Output configuration settings rendered from environment and inputs. Valid arguments are \\fIini\\fP for ini file output, and \\fIenv\\fP for environment variable output. See \\fBCONFIGURATION\\fP.')
o.set_groff_argvalue('format')
self.docs['dumpconfig'] = o
if self.arg_flags & Flag.WAIT:
o = DocEntry('-w')
o.set_groff('Wait for the last transaction to be confirmed on the network. Will generate an error if the EVM execution fails.')
self.docs['w'] = o
o = DocEntry('-ww')
o.set_groff('Wait for \\fIall\\fP transactions sequentially to be confirmed on the network. Will generate an error if EVM execution fails for any of the transactions.')
self.docs['ww'] = o
if self.arg_flags & Flag.ENV_PREFIX:
o = DocEntry('--env-prefix')
o.set_groff('Environment prefix for variables to overwrite configuration. Example: If \\fB--env-prefix\\fP is set to \\fBFOO\\fP then configuration variable \\fBBAR_BAZ\\fP would be set by environment variable \\fBFOO_BAZ_BAR\\fP. Also see \\fBENVIRONMENT\\fP.')
self.docs['envprefix'] = o
if self.arg_flags & Flag.PROVIDER:
o = DocEntry('-p', '--rpc-provider')
o.set_groff('Fully-qualified URL of RPC provider.')
self.docs['p'] = o
self.envs['p'] = 'RPC_PROVIDER'
o = DocEntry('--rpc-dialect')
o.set_groff('RPC backend dialect. If specified it \\fImay\\fP help with encoding and decoding issues.')
self.docs['rpcdialect'] = o
self.envs['rpcdialect'] = 'RPC_DIALECT'
if self.arg_flags & Flag.NO_TARGET == 0:
o = DocEntry('--height')
o.set_groff('Block height at which to query state for. Does not apply to transactions.')
self.docs['height'] = o
if self.arg_flags & Flag.RPC_AUTH:
o = DocEntry('--rpc-auth')
o.set_groff('RPC endpoint authentication method, e.g. how to handle a HTTP WWW-Authenticate header.')
self.docs['rpcauth'] = o
self.envs['rpcauth'] = 'RPC_AUTH'
o = DocEntry('--rpc-credentials')
o.set_groff('RPC endpoint authentication data. Format depends on the authentication method defined in \\fB--rpc-auth\\fP.')
self.docs['rpccredentials'] = o
self.envs['rpccredentials'] = 'RPC_CREDENTIALS'
if self.arg_flags & Flag.CHAIN_SPEC:
o = DocEntry('-i', '--chain-spec')
o.set_groff('Chain specification string, in the format <engine>:<fork>:<chain_id>:<common_name>. Example: "evm:london:1:ethereum".')
o.set_groff_argvalue('chain_spec')
self.docs['i'] = o
self.envs['i'] = 'RPC_CREDENTIALS'
if self.arg_flags & Flag.UNSAFE:
o = DocEntry('-u', '--unsafe')
o.set_groff('Allow addresses that do not pass checksum.')
self.docs['u'] = o
if self.arg_flags & Flag.SEQ:
o = DocEntry('--seq')
o.set_groff('Use numeric sequencial jsonrpc query ids. Useful for buggy server implementations who expects such.')
self.docs['seq'] = o
if self.arg_flags & Flag.KEY_FILE:
o = DocEntry('-y', '--key-path')
o.set_groff('Path to signing key.')
o.set_groff_argvalue('path')
self.docs['y'] = o
self.envs['y'] = 'WALLET_KEY_FILE'
o = DocEntry('--passphrase-file')
o.set_groff('Path to file containing password to unlock key file')
o.set_groff_argvalue('path')
self.docs['passphrasefile'] = o
if self.arg_flags & Flag.SEND:
o = DocEntry('-s')
o.set_groff('Send to network. If set, network state may change. This means tokens may be spent and so on. Use with care. Only applies to transactions.')
self.docs['s'] = o
if self.arg_flags & Flag.RAW:
o = DocEntry('--raw')
o.set_groff('Produce output most optimized for machines.')
self.docs['raw'] = o
if self.arg_flags & (Flag.SIGN | Flag.NONCE):
o = DocEntry('--nonce')
o.set_groff('Explicitly set nonce to use for transaction.')
self.docs['nonce'] = o
if self.arg_flags & (Flag.SIGN | Flag.FEE):
o = DocEntry('--fee-price')
o.set_groff('Set fee unit price to offer for the transaction. If used with \\fB-s\\fP this may incur actual network token cost.')
self.docs['feeprice'] = o
o = DocEntry('--fee-limit')
o.set_groff('Set the limit of execution units for the transaction. If used with \\fB-s\\fP this may incur actual network token cost. If \\fB--fee-price\\fP is not explicitly set, the price \\fImay\\fP be retrieved from the network, and multiplied with this value to define the cost.')
self.docs['feelimit'] = o
# # TODO: this manipulation should be DRYd
# if self.arg_flags & argflag_std_target == 0:
# self.arg_flags |= Flag.WALLET
if self.arg_flags & Flag.EXEC:
o = DocEntry('-e', '--executable-address')
o.set_groff('Address of an executable code point on the network.')
self.docs['e'] = o
if self.arg_flags & Flag.WALLET:
o = DocEntry('-a', '--recipient-address')
o.set_groff('Network wallet address to operate on. For read calls, this will be the wallet address for which the query is anchored. For transaction calls, it will be the wallet address for which state will be changed.')
self.docs['a'] = o
def process(self):
self.process_arg()
# self.process_env()
class EnvDocGenerator:
def __init__(self, arg_flags, override=None):
self.arg_flags = arg_flags
self.envs = {}
env_dir = os.path.join(data_dir, 'env')
self.config = confini.Config(env_dir, override_dirs=override)
self.config.process()
def __add(self, k):
#v = format_groff(k, self.config.get(k), None, typ='env')
v = apply_groff([k], self.config.get(k), None, typ='env')
self.envs[k] = v
def process(self):
ks = []
if self.arg_flags & Flag.PROVIDER:
ks += [
'RPC_PROVIDER',
'RPC_DIALECT',
]
if self.arg_flags & Flag.RPC_AUTH:
ks += [
'RPC_AUTH',
'RPC_CREDENTIALS',
]
if self.arg_flags & Flag.CHAIN_SPEC:
ks.append('CHAIN_SPEC')
if self.arg_flags & Flag.KEY_FILE:
ks += [
'WALLET_KEY_FILE',
'WALLET_PASSPHRASE',
]
for k in ks:
self.__add(k)
def __len__(self):
return len(self.envs)
def __str__(self):
s = ''
ks = list(self.envs.keys())
ks.sort()
for k in ks:
s += str(self.envs[k]) + "\n"
return s

View File

@@ -61,7 +61,7 @@ class Rpc:
self.id_generator = IntSequenceGenerator()
self.chain_spec = config.get('CHAIN_SPEC')
self.conn = self.constructor(url=config.get('RPC_PROVIDER'), chain_spec=self.chain_spec, auth=auth)
self.conn = self.constructor(url=config.get('RPC_PROVIDER'), chain_spec=self.chain_spec, auth=auth, verify_identity=config.true('RPC_VERIFY'), timeout=float(config.get('RPC_TIMEOUT')))
return self.conn

View File

@@ -27,6 +27,10 @@ class Wallet:
self.use_checksum = False
def init(self):
self.signer = self.signer_constructor(self.keystore)
def from_config(self, config):
"""Instantiates a signer from the registered signer class, using parameters from a processed configuration.

View File

@@ -102,10 +102,14 @@ class RPCConnection:
}
__constructors_for_chains = {}
def __init__(self, url=None, chain_spec=None, auth=None):
def __init__(self, url=None, chain_spec=None, auth=None, verify_identity=True, timeout=1.0):
self.timeout = timeout
self.chain_spec = chain_spec
self.location = None
self.basic = None
self.verify_identity = verify_identity
if not self.verify_identity:
logg.warning('RPC host identity verification is OFF. Beware, you will be easy to cheat')
if url == None:
return
self.auth = auth
@@ -130,6 +134,9 @@ class RPCConnection:
self.location = os.path.join('{}://'.format(url_parsed.scheme), location)
self.location = urljoin(self.location, url_parsed.path)
if url_parsed.query != '':
self.location = urljoin(self.location, '?' + url_parsed.query)
logg.debug('parsed url {} to location {}'.format(url, self.location))
@@ -287,6 +294,11 @@ class JSONRPCHTTPConnection(HTTPConnection):
:returns: Result value part of JSON RPC response
:todo: Invalid response exception from invalid json response
"""
ssl_ctx = None
if not self.verify_identity:
import ssl
ssl_ctx = ssl.SSLContext()
ssl_ctx.verify_mode = ssl.CERT_NONE
req = Request(
self.location,
method='POST',
@@ -311,14 +323,21 @@ class JSONRPCHTTPConnection(HTTPConnection):
)
ho = build_opener(handler)
install_opener(ho)
r = None
try:
r = urlopen(req, data=data.encode('utf-8'))
r = urlopen(
req,
data=data.encode('utf-8'),
context=ssl_ctx,
timeout=self.timeout,
)
except URLError as e:
raise RPCException(e)
result = json.load(r)
logg.debug('(HTTP) recv {}'.format(result))
resp = r.read()
logg.debug('(HTTP) recv {}'.format(resp.decode('utf-8')))
result = json.loads(resp)
if o['id'] != result['id']:
raise ValueError('RPC id mismatch; sent {} received {}'.format(o['id'], result['id']))
return jsonrpc_result(result, error_parser)

View File

@@ -4,10 +4,20 @@ auth =
credentials =
dialect = default
scheme = http
verify = 1
timeout = 10.0
proxy =
[chain]
spec =
min_fee = 0
max_fee = 0
[wallet]
key_file =
passphrase =
[state]
path =
runtime_path =
backend =

14
chainlib/data/env/env.ini vendored Normal file
View File

@@ -0,0 +1,14 @@
[rpc]
provider = Fully-qualified URL to the RPC endpoint of the blockchain node.
auth = Authentication method to use for the \fIRPC_PROVIDER\fP. Currently only \fIbasic\fP is supported.
credentials = Authentication credentials to use for \fIRPC_AUTH\fP. For \fIbasic\fP authentication the value must be given as \fI<user>:<pass>\fP.
dialect = Enables translations of EVM node specific formatting and response codes.
scheme = (needs content)
verify = (needs content)
[chain]
spec = String specifying the type of chain connected to, in the format \fI<engine>:<fork>:<network_id>:<common_name>\fP. For EVM nodes the \fIengine\fP value will always be \fIevm\fP.
[wallet]
key_file = The wallet key file containing private key to use for transaction signing. Overridden by \fB-y\fP.
passphrase = Passphrase to unlock wallet. \fBWARNING:\fP it is \fBunsafe\fP to pass the passphrase as an environment variable. If the key unlocks something of value, the passphrase should rather be in a configuration file, preferably as an encrypted entry. Alternatively, a passphrase can be read from file using the \fB--passphrase-file\fP option. Files containing passphrases should only be accessible by the owner.

View File

@@ -30,9 +30,7 @@ class TxHexNormalizer:
def __hex_normalize(self, data, context):
#r = add_0x(hex_uniform(strip_0x(data)))
r = hex_uniform(strip_0x(data))
logg.debug('normalize {} {} -> {}'.format(context, data, r))
return r

View File

@@ -11,13 +11,18 @@ class JSONRPCException(RPCException):
pass
class InitializationError(Exception):
"""Base error for errors occurring while processing settings
"""
pass
class ExecutionError(Exception):
"""Base error for transaction execution failures
"""
pass
class SignerMissingException(Exception):
class SignerMissingException(InitializationError):
"""Raised when attempting to retrieve a signer when none has been added
"""

49
chainlib/settings.py Normal file
View File

@@ -0,0 +1,49 @@
# external imports
from aiee.numbers import postfix_to_int
# local imports
from .chain import ChainSpec
class ChainSettings:
def __init__(self):
self.o = {}
self.get = self.o.get
def set(self, k, v):
self.o[k] = v
def __str__(self):
ks = list(self.o.keys())
ks.sort()
s = ''
for k in ks:
s += '{}: {}\n'.format(k, self.o.get(k))
return s
def process_settings_common(settings, config):
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
settings.set('CHAIN_SPEC', chain_spec)
return settings
def process_settings_value(settings, config):
value = None
try:
value = config.get('_VALUE')
except KeyError:
return settings
value = postfix_to_int(config.get('_VALUE'))
settings.set('VALUE', value)
return settings
def process_settings(settings, config):
settings = process_settings_common(settings, config)
settings = process_settings_value(settings, config)
return settings

View File

@@ -4,6 +4,7 @@ import enum
class Status(enum.Enum):
"""Representation of transaction status in network.
"""
UNKNOWN = -1
PENDING = 0
SUCCESS = 1
ERROR = 2

View File

@@ -1,4 +1,9 @@
class Tx:
# local imports
from .status import Status
from .src import Src
class Tx(Src):
"""Base class to extend for implementation specific transaction objects.
:param src: Transaction representation source
@@ -7,8 +12,73 @@ class Tx:
:type block: chainlib.block.Block
"""
def __init__(self, src, block=None):
self.txs = []
self.src = src
def __init__(self, src=None, block=None, result=None, strict=False):
self.block = block
self.block_src = None
self.index = -1
self.fee_limit = None
self.fee_price = None
self.nonce = None
self.value = 0
self.outputs = []
self.inputs = []
self.payload = None
self.result = None
super(Tx, self).__init__(src)
if block != None:
self.apply_block(block)
if result != None:
self.apply_result(result)
def apply_result(self, result):
self.result = result
def apply_block(self, block):
self.block = block
@property
def status(self):
if self.result == None:
return None
return self.result.status
@property
def status_name(self):
if self.result == None:
return None
return self.result.status.name
def generate_wire(self, chain_spec):
pass
def as_dict(self):
raise NotImplementedError()
def __str__(self):
if self.block != None:
return 'tx {} status {} block {} index {}'.format(self.display_hash(), self.status_name(), self.block.number, self.index)
else:
return 'tx {} status {}'.format(self.display_hash(), self.hash, self.status_name())
class TxResult(Src):
def __init__(self, src=None):
self.status = Status.UNKNOWN
self.tx_index = None
self.block_hash = None
self.fee_cost = 0
super(TxResult, self).__init__(src=src)

View File

@@ -1,5 +1,3 @@
@node chainlib-cli
@section Command line interface provisions
The base CLI provisions of @code{chainlib} simplifies the generation of a some base object instances by command line arguments, environment variables and configuration schemas.

View File

@@ -1,5 +1,3 @@
@node chainlib-lib
@section Base library contents

View File

@@ -1,5 +1,4 @@
@node chainlib-config
@anchor{chainlib-config}
@section Rendering configurations
Configurations in @code{chainlib} are processed, rendered and interfaced using the @code{confini} python package.

View File

@@ -1,5 +1,4 @@
@top chainlib
@node chainlib
@chapter Chainlib
@include intro.texi

View File

@@ -1,4 +1,4 @@
@node chainlib-intro
@section Overview
Chainlib is an attempt at employing a universal interface to manipulate and access blockchains regardless of underlying architecture.

View File

@@ -1,3 +1,5 @@
funga>=0.5.1a1,<0.6.0
funga~=0.5.2
pysha3==1.0.2
hexathon~=0.0.1a8
hexathon~=0.1.7
confini~=0.6.1
aiee~=0.3.1

232
scripts/chainlib-man.py Normal file
View File

@@ -0,0 +1,232 @@
#!/usr/bin/python3
import logging
import os
import sys
import argparse
import tempfile
import shutil
from hexathon import strip_0x, add_0x
from chainlib.cli.man import (
EnvDocGenerator,
DocGenerator,
apply_groff,
)
from chainlib.cli.base import (
argflag_std_base,
flag_names,
)
from chainlib.cli.arg import ArgumentParser as ChainlibArgumentParser
from chainlib.cli.config import Config
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
configuration_description = """
.SH CONFIGURATION
All configuration settings may be overriden both by environment variables, or by overriding settings with the contents of ini-files in the directory defined by the \\fB-c\\fP option.
The active configuration, with values assigned from environment and arguments, can be output using the \\fB--dumpconfig\\fP \\fIformat\\fP option. Note that entries having keys prefixed with underscore (e.g. _SEQ) are not actual configuration settings, and thus cannot be overridden with environment variables.
To refer to a configuration setting by environment variables, the \\fIsection\\fP and \\fIkey\\fP are concatenated together with an underscore, and transformed to upper-case. For example, the configuration variable \\fIFOO_BAZ_BAR\\fP refers to an ini-file entry as follows:
.EX
[foo]
bar_baz = xyzzy
.EE
In the \\fBENVIRONMENT\\fP section below, the relevant configuration settings for this tool is listed along with a short description of its meaning.
Some configuration settings may also be overriden by command line options. Also note that the use of the \\fB-n\\fP and \\fB--env-prefix\\fP options affect how environment and configuration is read. The effects of options on how configuration settings are affective is described in the respective \\fBOPTIONS\\fP section.
"""
seealso_description = """
.SH SEE ALSO
.BP
confini-dump(1), eth-keyfile(1)
"""
legal_description = """
.SH LICENSE
This documentation and its source is licensed under the Creative Commons Attribution-Sharealike 4.0 International license.
The source code of the tool this documentation describes is licensed under the GNU General Public License 3.0.
.SH COPYRIGHT
Louis Holbrook <dev@holbrook.no> (https://holbrook.no)
PGP: 59A844A484AC11253D3A3E9DCDCBD24DD1D0E001
"""
source_description = """
.SH SOURCE CODE
https://git.defalsify.org
"""
argparser = argparse.ArgumentParser()
argparser.add_argument('-b', default=add_0x(hex(argflag_std_base)), help='argument flag bitmask')
argparser.add_argument('-c', help='config override directory')
argparser.add_argument('-n', required=True, help='tool name to use for man filename')
argparser.add_argument('-d', default='.', help='output directory')
argparser.add_argument('-v', action='store_true', help='turn on debug logging')
#argparser.add_argument('--overrides-dir', dest='overrides_dir', help='load options description override from file')
argparser.add_argument('--overrides-env-dir', dest='overrides_env_dir', help='load envionment description override config from directory')
argparser.add_argument('--overrides-config-file', dest='overrides_config_file', help='load configuration text from file')
argparser.add_argument('source_dir', help='directory containing sources for the tool man page')
args = argparser.parse_args(sys.argv[1:])
if args.v:
logg.setLevel(logging.DEBUG)
b = bytes.fromhex(strip_0x(args.b))
flags = int.from_bytes(b, byteorder='big')
flags_debug= flag_names(flags)
logg.debug('apply arg flags {}: {}'.format(flags, ', '.join(flags_debug)))
g = DocGenerator(flags)
toolname = args.n
g.process()
def apply_override(g, override_dir):
#if args.overrides_dir != None:
overrides_file = os.path.join(override_dir, toolname + '.overrides')
override = True
f = None
try:
f = open(overrides_file, 'r')
except FileNotFoundError:
logg.debug('no overrides found for {}'.format(toolname))
override = False
if override:
while True:
s = f.readline()
if len(s) == 0:
break
v = s.split('\t', maxsplit=4)
fargs = None
try:
fargs = v[2].rstrip().split(',')
except IndexError:
fargs = []
argvalue = None
if len(v) == 4:
argvalue = v[3]
try:
g.override_arg(v[0], v[1], fargs, argvalue=argvalue)
except KeyError:
logg.info('adding not previously registered key {} flags: {}'.format(v[0], ','.join(fargs)))
g.set_arg(v[0], v[1], fargs, argvalue=argvalue)
f.close()
return g
def get_head(tool_name, source_dir):
header_file = os.path.join(source_dir, tool_name + '.head.groff')
f = open(header_file, 'r')
head = f.read()
f.close()
return head
def get_examples(tool_name, source_dir):
example_file = os.path.join(source_dir, tool_name + '.examples.groff')
f = None
try:
f = open(example_file, 'r')
except FileNotFoundError:
logg.debug('no examples file found for {}'.format(tool_name))
return None
logg.info('examples file {} found for {}'.format(example_file, tool_name))
examples = f.read()
f.close()
return examples
def get_custom(tool_name, source_dir):
custom_file = os.path.join(source_dir, tool_name + '.custom.groff')
f = None
try:
f = open(custom_file, 'r')
except FileNotFoundError:
logg.debug('no custom file found for {}'.format(tool_name))
return None
logg.info('custom file {} found for {}'.format(custom_file, tool_name))
custom = f.read()
f.close()
return custom
def get_seealso(tool_name, source_dir):
seealso_file = os.path.join(source_dir, tool_name + '.seealso.groff')
f = None
try:
f = open(seealso_file, 'r')
except FileNotFoundError:
logg.debug('no seealso file found for {}'.format(tool_name))
return None
logg.info('seealso file {} found for {}'.format(seealso_file, tool_name))
seealso = f.read()
f.close()
return seealso
g = apply_override(g, args.source_dir)
ge = EnvDocGenerator(flags, override=args.overrides_env_dir)
ge.process()
head = get_head(toolname, args.source_dir)
examples = get_examples(toolname, args.source_dir)
custom = get_custom(toolname, args.source_dir)
seealso = get_seealso(toolname, args.source_dir)
if args.overrides_config_file != None:
f = open(args.overrides_config_file, 'r')
configuration_description = f.read()
f.close()
(fd, fp) = tempfile.mkstemp()
f = os.fdopen(fd, 'w')
f.write(head)
f.write(str(g))
f.write(configuration_description)
if custom != None:
f.write(custom)
if examples != None:
f.write(".SH EXAMPLES\n\n")
f.write(examples)
if seealso != None:
seealso_description = seealso
if len(ge) > 0:
f.write(".SH ENVIRONMENT\n\n")
f.write(str(ge))
f.write(legal_description)
f.write(source_description)
f.write(seealso_description)
f.close()
dest = os.path.join(args.d, toolname + '.1')
shutil.copyfile(fp, dest)
os.unlink(fp)

View File

@@ -1,10 +1,13 @@
[metadata]
name = chainlib
version = 0.0.10a6
description = Generic blockchain access library and tooling
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/chaintools/chainlib
name=chainlib
license=WTFPL2
author_email=dev@holbrook.no
description=Generic blockchain access library and tooling
version=0.3.0
url=https://gitlab.com/chaintools/chainlib
author=Louis Holbrook
keywords =
dlt
blockchain
@@ -13,18 +16,8 @@ classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Topic :: Software Development :: Libraries
Environment :: Console
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
license = GPL3
licence_files =
LICENSE.txt
[options]
python_requires = >= 3.6
include_package_data = True
packages =
chainlib
chainlib.cli

View File

@@ -16,5 +16,15 @@ setup(
install_requires=requirements,
extras_require={
'xdg': "pyxdg~=0.27",
}
},
license_files= ('LICENSE.txt',),
python_requires = '>=3.8',
include_package_data = True,
packages = [
'chainlib',
'chainlib.cli',
],
scripts = [
'scripts/chainlib-man.py',
],
)

View File

@@ -1,40 +1,81 @@
# standard imports
import unittest
import logging
# local imports
from chainlib.chain import ChainSpec
# test imports
from tests.base import TestBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
logg.setLevel(logging.DEBUG)
class TestChain(TestBase):
def test_chain_spec_str(self):
s = ChainSpec('foo', 'bar', 3, 'baz')
self.assertEqual('foo:bar:3:baz', str(s))
s = ChainSpec('foo', 'bar', 3)
self.assertEqual('foo:bar:3', str(s))
def test_chain_spec(self):
s = ChainSpec('foo', 'bar', 3, 'baz')
self.assertEqual('foo:bar:3:baz', str(s))
s = ChainSpec('foo', 'bar', 3, 'baz', ['inky', 'pinky', 'blinky'])
self.assertEqual('foo:bar:3:baz:inky:pinky:blinky', str(s))
def test_chain_spec(self):
s = ChainSpec.from_chain_str('foo:bar:3')
s = ChainSpec.from_chain_str('foo:bar:3:baz')
s = ChainSpec.from_chain_str('foo:bar:3:baz:inky:pinky:blinky')
with self.assertRaises(ValueError):
s = ChainSpec.from_chain_str('foo:bar:a')
s = ChainSpec.from_chain_str('foo:bar')
s = ChainSpec.from_chain_str('foo')
s = ChainSpec.from_chain_str('foo1:bar:3')
s = ChainSpec.from_chain_str('foo:bar2:3')
def test_chain_spec_dict(self):
s = 'foo:bar:3:baz'
c = ChainSpec.from_chain_str('foo:bar:3:baz')
ss = 'foo:bar:3:baz:inky:pinky:blinky'
c = ChainSpec.from_chain_str(ss)
d = c.asdict()
self.assertEqual(d['arch'], 'foo')
self.assertEqual(d['fork'], 'bar')
self.assertEqual(d['network_id'], 3)
self.assertEqual(d['common_name'], 'baz')
self.assertEqual(d['custom'], ['inky', 'pinky', 'blinky'])
cc = ChainSpec.from_dict(d)
self.assertEqual(s, str(cc))
self.assertEqual(ss, str(cc))
d = c.asdict(use_custom=False)
cc = ChainSpec.from_dict(d)
self.assertEqual(str(cc), 'foo:bar:3:baz')
d = c.asdict(use_common_name=False)
cc = ChainSpec.from_dict(d)
self.assertEqual(str(cc), 'foo:bar:3')
def test_chain_spec_compare(self):
a = 'foo:bar:42:baz'
b = 'foo:bar:42:barbar'
c = 'foo:bar:42:baz:inky:pinky:blinky'
ca = ChainSpec.from_chain_str(a)
cb = ChainSpec.from_chain_str(b)
self.assertTrue(ca.is_same_as(cb))
self.assertFalse(ca.is_same_as(cb, use_common_name=True))
cc = ChainSpec.from_chain_str(c)
logg.debug('chain_spec_cmp ' + str(cc.o))
self.assertTrue(ca.is_same_as(cc))
self.assertTrue(ca.is_same_as(cc, use_common_name=True))
self.assertFalse(ca.is_same_as(cc, use_common_name=True, use_custom=True))

View File

@@ -1,39 +1,69 @@
# standard imports
import unittest
import os
import logging
# external imports
from aiee.arg import process_args
# local imports
import chainlib.cli
from chainlib.cli.base import argflag_std_base
#from chainlib.cli.base import argflag_std_base
from chainlib.cli.arg import (
ArgFlag,
Arg,
ArgumentParser,
)
from chainlib.cli.config import (
Config,
process_config,
)
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata')
config_dir = os.path.join(data_dir, 'config')
logging.basicConfig(level=logging.DEBUG)
class TestCli(unittest.TestCase):
def setUp(self):
self.flags = ArgFlag()
self.arg = Arg(self.flags)
def test_args_process_single(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
argv = [
'-vv',
'-n',
'foo',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args)
config = Config(config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CONFIG_USER_NAMESPACE'), 'foo')
def test_args_process_schema_override(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
args = ap.parse_args([])
config = chainlib.cli.config.Config.from_args(args, base_config_dir=config_dir)
config = Config(config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('FOO_BAR'), 'baz')
def test_args_process_arg_override(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG
process_args(ap, self.arg, flags)
argv = [
'-c',
config_dir,
@@ -41,53 +71,57 @@ class TestCli(unittest.TestCase):
'foo',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args, base_config_dir=config_dir)
config = Config(config_dir, namespace=args.namespace)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('FOO_BAR'), 'bazbazbaz')
def test_args_process_internal_override(self):
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
flags = self.flags.VERBOSE | self.flags.CONFIG | self.flags.CHAIN_SPEC
process_args(ap, self.arg, flags)
args = ap.parse_args()
default_config_dir = os.path.join(config_dir, 'default')
config = chainlib.cli.config.Config.from_args(args, default_config_dir=default_config_dir)
config = Config(default_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'baz:bar:13:foo')
user_config_dir = os.path.join(default_config_dir, 'user')
config = chainlib.cli.config.Config.from_args(args, default_config_dir=default_config_dir, user_config_dir=user_config_dir)
config = Config(default_config_dir)
config.add_override_dir(user_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'foo:foo:666:foo')
config = chainlib.cli.config.Config.from_args(args, default_config_dir=default_config_dir, user_config_dir=default_config_dir)
config = Config(default_config_dir)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'baz:bar:13:foo')
ap = chainlib.cli.arg.ArgumentParser()
ap = ArgumentParser()
process_args(ap, self.arg, flags)
argv = [
'-n',
'user',
]
args = ap.parse_args(argv)
config = chainlib.cli.config.Config.from_args(args, default_config_dir=default_config_dir, user_config_dir=default_config_dir)
config = Config(default_config_dir, namespace=args.namespace)
config = process_config(config, self.arg, args, flags)
self.assertEqual(config.get('CHAIN_SPEC'), 'foo:foo:666:foo')
def test_args_process_extra(self):
ap = chainlib.cli.arg.ArgumentParser()
ap.add_argument('--foo', type=str)
argv = [
'--foo',
'bar',
]
args = ap.parse_args(argv)
extra_args = {
'foo': None,
}
config = chainlib.cli.config.Config.from_args(args, extra_args=extra_args)
self.assertEqual(config.get('_FOO'), 'bar')
def test_all_args(self):
ap = ArgumentParser()
flags = self.flags.all
process_args(ap, self.arg, flags)
extra_args = {
'foo': 'FOOFOO',
}
config = chainlib.cli.config.Config.from_args(args, extra_args=extra_args)
self.assertEqual(config.get('FOOFOO'), 'bar')
args = ap.parse_args([
'-y', 'foo',
'-i', 'foo:bar:42:baz',
])
config = Config()
config = process_config(config, self.arg, args, flags)
if __name__ == '__main__':