Add custom fields to chain spec, harden custom field imports, improve dumpconfig handle

This commit is contained in:
nolash 2021-10-28 12:16:23 +02:00
parent 7d7209dd31
commit 5bfdb51676
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 142 additions and 21 deletions

View File

@ -1,5 +1,15 @@
# standard imports # standard imports
import copy import copy
import re
def is_valid_label(v, alpha_only=False):
re_m = None
if alpha_only:
re_m = r'^[a-zA-Z]+$'
else:
re_m = r'^[a-zA-Z0-9]+$'
return re.match(re_m, v)
class ChainSpec: class ChainSpec:
@ -16,14 +26,38 @@ class ChainSpec:
:param tag: Descriptive tag :param tag: Descriptive tag
:type tag: str :type tag: str
""" """
def __init__(self, arch, fork, network_id, common_name=None): def __init__(self, arch, fork, network_id, common_name=None, custom=[], safe=True):
if custom == None:
custom = []
elif not isinstance(custom, list):
raise ValueError('custom value must be list')
self.o = { self.o = {
'arch': arch, 'arch': arch,
'fork': fork, 'fork': fork,
'network_id': network_id, 'network_id': network_id,
'common_name': common_name, 'common_name': common_name,
'custom': custom,
} }
if safe:
self.validate()
def validate(self):
self.o['network_id'] = int(self.o['network_id'])
if not is_valid_label(self.o['arch'], alpha_only=True):
raise ValueError('arch: ' + self.o['arch'])
if not is_valid_label(self.o['fork'], alpha_only=True):
raise ValueError('fork: ' + self.o['fork'])
if self.o.get('common_name') and not is_valid_label(self.o['common_name']):
raise ValueError('common_name: ' + self.o['common_name'])
if self.o.get('custom'):
for i, v in enumerate(self.o['custom']):
if not is_valid_label(v):
raise ValueError('common_name {}: {}'.format(i, v))
def network_id(self): def network_id(self):
"""Returns the network id part of the spec. """Returns the network id part of the spec.
@ -43,6 +77,12 @@ class ChainSpec:
def engine(self): def engine(self):
"""Alias of self.arch()
"""
return self.arch()
def arch(self):
"""Returns the chain architecture part of the spec """Returns the chain architecture part of the spec
:rtype: str :rtype: str
@ -51,6 +91,15 @@ class ChainSpec:
return self.o['arch'] return self.o['arch']
def fork(self):
"""Returns the fork part of the spec
:rtype: str
:returns: fork
"""
return self.o['fork']
def common_name(self): def common_name(self):
"""Returns the common name part of the spec """Returns the common name part of the spec
@ -60,6 +109,21 @@ class ChainSpec:
return self.o['common_name'] return self.o['common_name']
def is_same_as(self, chain_spec_cmp, use_common_name=False, use_custom=False):
a = ['arch', 'fork', 'network_id']
if use_common_name:
a += ['common_name']
if use_custom:
a += ['custom']
try:
for k in a:
assert(chain_spec_cmp.o[k] == self.o[k])
except AssertionError:
return False
return True
@staticmethod @staticmethod
def from_chain_str(chain_str): def from_chain_str(chain_str):
"""Create a new ChainSpec object from a colon-separated string, as output by the string representation of the ChainSpec object. """Create a new ChainSpec object from a colon-separated string, as output by the string representation of the ChainSpec object.
@ -79,9 +143,13 @@ class ChainSpec:
if len(o) < 3: if len(o) < 3:
raise ValueError('Chain string must have three sections, got {}'.format(len(o))) raise ValueError('Chain string must have three sections, got {}'.format(len(o)))
common_name = None common_name = None
if len(o) == 4: if len(o) > 3:
common_name = o[3] common_name = o[3]
return ChainSpec(o[0], o[1], int(o[2]), common_name) custom = []
if len(o) > 4:
for i in range(4, len(o)):
custom.append(o[i])
return ChainSpec(o[0], o[1], int(o[2]), common_name=common_name, custom=custom)
@staticmethod @staticmethod
@ -100,20 +168,28 @@ class ChainSpec:
:rtype: chainlib.chain.ChainSpec :rtype: chainlib.chain.ChainSpec
:returns: Resulting chain spec :returns: Resulting chain spec
""" """
return ChainSpec(o['arch'], o['fork'], o['network_id'], common_name=o['common_name']) return ChainSpec(o['arch'], o['fork'], o['network_id'], common_name=o.get('common_name'), custom=o.get('custom'))
def asdict(self): def asdict(self, use_common_name=True, use_custom=True):
"""Create a dictionary representation of the chain spec. """Create a dictionary representation of the chain spec.
:rtype: dict :rtype: dict
:returns: Chain spec dictionary :returns: Chain spec dictionary
""" """
return copy.copy(self.o) r = copy.copy(self.o)
if not use_common_name:
del r['common_name']
del r['custom']
if not use_custom:
del r['custom']
return r
def __str__(self): def __str__(self):
s = '{}:{}:{}'.format(self.o['arch'], self.o['fork'], self.o['network_id']) s = '{}:{}:{}'.format(self.o['arch'], self.o['fork'], self.o['network_id'])
if self.o['common_name'] != None: if self.o.get('common_name'):
s += ':' + self.o['common_name'] s += ':' + self.o['common_name']
if self.o.get('custom'):
s += ':' + ':'.join(self.o['custom'])
return s return s

View File

@ -236,7 +236,8 @@ class Config(confini.Config):
if existing_r == None or r != None: if existing_r == None or r != None:
config.add(r, v, exists_ok=True) config.add(r, v, exists_ok=True)
if getattr(args, 'dumpconfig', None) != None: logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>' + str(getattr(args, 'dumpconfig')))
if getattr(args, 'dumpconfig', None):
config_keys = config.all() config_keys = config.all()
with_values = not config.get('_RAW') with_values = not config.get('_RAW')
for k in config_keys: for k in config_keys:

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = chainlib name = chainlib
version = 0.0.10a8 version = 0.0.10a9
description = Generic blockchain access library and tooling description = Generic blockchain access library and tooling
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no

View File

@ -1,40 +1,81 @@
# standard imports
import unittest import unittest
import logging
# local imports
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
# test imports
from tests.base import TestBase from tests.base import TestBase
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
logg.setLevel(logging.DEBUG)
class TestChain(TestBase): class TestChain(TestBase):
def test_chain_spec_str(self): def test_chain_spec_str(self):
s = ChainSpec('foo', 'bar', 3, 'baz')
self.assertEqual('foo:bar:3:baz', str(s))
s = ChainSpec('foo', 'bar', 3) s = ChainSpec('foo', 'bar', 3)
self.assertEqual('foo:bar:3', str(s)) self.assertEqual('foo:bar:3', str(s))
def test_chain_spec(self): s = ChainSpec('foo', 'bar', 3, 'baz')
self.assertEqual('foo:bar:3:baz', str(s))
s = ChainSpec('foo', 'bar', 3, 'baz', ['inky', 'pinky', 'blinky'])
self.assertEqual('foo:bar:3:baz:inky:pinky:blinky', str(s))
def test_chain_spec(self):
s = ChainSpec.from_chain_str('foo:bar:3') s = ChainSpec.from_chain_str('foo:bar:3')
s = ChainSpec.from_chain_str('foo:bar:3:baz') s = ChainSpec.from_chain_str('foo:bar:3:baz')
s = ChainSpec.from_chain_str('foo:bar:3:baz:inky:pinky:blinky')
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
s = ChainSpec.from_chain_str('foo:bar:a') s = ChainSpec.from_chain_str('foo:bar:a')
s = ChainSpec.from_chain_str('foo:bar') s = ChainSpec.from_chain_str('foo:bar')
s = ChainSpec.from_chain_str('foo') s = ChainSpec.from_chain_str('foo')
s = ChainSpec.from_chain_str('foo1:bar:3')
s = ChainSpec.from_chain_str('foo:bar2:3')
def test_chain_spec_dict(self): def test_chain_spec_dict(self):
s = 'foo:bar:3:baz' ss = 'foo:bar:3:baz:inky:pinky:blinky'
c = ChainSpec.from_chain_str('foo:bar:3:baz') c = ChainSpec.from_chain_str(ss)
d = c.asdict() d = c.asdict()
self.assertEqual(d['arch'], 'foo') self.assertEqual(d['arch'], 'foo')
self.assertEqual(d['fork'], 'bar') self.assertEqual(d['fork'], 'bar')
self.assertEqual(d['network_id'], 3) self.assertEqual(d['network_id'], 3)
self.assertEqual(d['common_name'], 'baz') self.assertEqual(d['common_name'], 'baz')
self.assertEqual(d['custom'], ['inky', 'pinky', 'blinky'])
cc = ChainSpec.from_dict(d) cc = ChainSpec.from_dict(d)
self.assertEqual(s, str(cc)) self.assertEqual(ss, str(cc))
d = c.asdict(use_custom=False)
cc = ChainSpec.from_dict(d)
self.assertEqual(str(cc), 'foo:bar:3:baz')
d = c.asdict(use_common_name=False)
cc = ChainSpec.from_dict(d)
self.assertEqual(str(cc), 'foo:bar:3')
def test_chain_spec_compare(self):
a = 'foo:bar:42:baz'
b = 'foo:bar:42:barbar'
c = 'foo:bar:42:baz:inky:pinky:blinky'
ca = ChainSpec.from_chain_str(a)
cb = ChainSpec.from_chain_str(b)
self.assertTrue(ca.is_same_as(cb))
self.assertFalse(ca.is_same_as(cb, use_common_name=True))
cc = ChainSpec.from_chain_str(c)
logg.debug('chain_spec_cmp ' + str(cc.o))
self.assertTrue(ca.is_same_as(cc))
self.assertTrue(ca.is_same_as(cc, use_common_name=True))
self.assertFalse(ca.is_same_as(cc, use_common_name=True, use_custom=True))

View File

@ -1,6 +1,7 @@
# standard imports # standard imports
import unittest import unittest
import os import os
import logging
# local imports # local imports
import chainlib.cli import chainlib.cli
@ -10,6 +11,8 @@ script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, 'testdata') data_dir = os.path.join(script_dir, 'testdata')
config_dir = os.path.join(data_dir, 'config') config_dir = os.path.join(data_dir, 'config')
logging.basicConfig(level=logging.DEBUG)
class TestCli(unittest.TestCase): class TestCli(unittest.TestCase):