Compare commits

...

4 Commits

Author SHA1 Message Date
fb3253ae55 save 2022-02-21 08:34:25 +03:00
d3f65798f1 feat: add interactive token deployment 2022-02-10 11:22:21 +03:00
1498edbb07
Adds capability to deploy demurrage tokens. 2021-12-15 16:00:51 +03:00
10cbb1344d
Cosmetic clean ups. 2021-12-15 16:00:18 +03:00
38 changed files with 2096 additions and 287 deletions

6
.gitignore vendored
View File

@ -1,3 +1,9 @@
__pycache__ __pycache__
*.pyc *.pyc
*.egg-info *.egg-info
.venv
build
.vscode
.idea
contracts
*.egg

402
.pylintrc Normal file
View File

@ -0,0 +1,402 @@
[MASTER]
# Specify a configuration file.
#rcfile=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
#init-hook=
# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=third_party
# Add files or directories matching the regex patterns to the blacklist. The
# regex matches against base names, not paths.
ignore-patterns=object_detection_grpc_client.py,prediction_pb2.py,prediction_pb2_grpc.py
# Pickle collected data for later comparisons.
persistent=no
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint.
jobs=4
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
extension-pkg-whitelist=
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
confidence=
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then reenable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
#
# Kubeflow disables string-interpolation because we are starting to use f
# style strings
disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,missing-docstring,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,relative-import,invalid-name,bad-continuation,no-member,locally-disabled,fixme,import-error,too-many-locals,no-name-in-module,too-many-instance-attributes,no-self-use,logging-fstring-interpolation
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
output-format=text
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]". This option is deprecated
# and it will be removed in Pylint 2.0.
files-output=no
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
#msg-template=
[BASIC]
# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_
# Bad variable names which should always be refused, separated by a comma
bad-names=foo,bar,baz,toto,tutu,tata
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
property-classes=abc.abstractproperty
# Regular expression matching correct function names
function-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for function names
function-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct variable names
variable-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for variable names
variable-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct constant names
const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$
# Naming hint for constant names
const-name-hint=(([A-Z_][A-Z0-9_]*)|(__.*__))$
# Regular expression matching correct attribute names
attr-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for attribute names
attr-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct argument names
argument-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for argument names
argument-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression matching correct class attribute names
class-attribute-rgx=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Naming hint for class attribute names
class-attribute-name-hint=([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
# Regular expression matching correct inline iteration names
inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$
# Naming hint for inline iteration names
inlinevar-name-hint=[A-Za-z_][A-Za-z0-9_]*$
# Regular expression matching correct class names
class-rgx=[A-Z_][a-zA-Z0-9]+$
# Naming hint for class names
class-name-hint=[A-Z_][a-zA-Z0-9]+$
# Regular expression matching correct module names
module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Naming hint for module names
module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
# Regular expression matching correct method names
method-rgx=[a-z_][a-z0-9_]{2,30}$
# Naming hint for method names
method-name-hint=[a-z_][a-z0-9_]{2,30}$
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=^_
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=-1
[ELIF]
# Maximum number of nested blocks for function / method body
max-nested-blocks=5
[TYPECHECK]
# Tells whether missing members accessed in mixin class should be ignored. A
# mixin class is detected if its name ends with "mixin" (case insensitive).
ignore-mixin-members=yes
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=140
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=no
# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
# `trailing-comma` allows a space between comma and closing bracket: (a, ).
# `empty-line` allows space-only lines.
no-space-check=trailing-comma,dict-separator
# Maximum number of lines in a module
max-module-lines=1000
# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1
# tab).
# Use 2 spaces consistent with TensorFlow style.
indent-string=' '
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the name of dummy variables (i.e. expectedly
# not used).
dummy-variables-rgx=(_+[a-zA-Z0-9]*?$)|dummy
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six.moves,future.builtins
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format
logging-modules=logging
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
[SPELLING]
# Spelling dictionary name. Available dictionaries: none. To make it working
# install python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to indicated private dictionary in
# --spelling-private-dict-file option instead of raising a message.
spelling-store-unknown-words=no
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,TERMIOS,Bastion,rexec
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
[DESIGN]
# Maximum number of arguments for function / method
max-args=7
# Argument names that match this expression will be ignored. Default to name
# with leading underscore
ignored-argument-names=_.*
# Maximum number of locals for function / method body
max-locals=15
# Maximum number of return / yield for function / method body
max-returns=6
# Maximum number of branch for function / method body
max-branches=12
# Maximum number of statements in function / method body
max-statements=50
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of attributes for a class (see R0902).
max-attributes=7
# Minimum number of public methods for a class (see R0903).
min-public-methods=0
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
# Maximum number of boolean expressions in a if statement
max-bool-expr=5
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,__new__,setUp
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=mcs
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,_fields,_replace,_source,_make
[EXCEPTIONS]
# Exceptions that will emit a warning when being caught. Defaults to
# "Exception"
overgeneral-exceptions=Exception

File diff suppressed because one or more lines are too long

101
cic/MetaRequestHandler.py Normal file
View File

@ -0,0 +1,101 @@
from __future__ import annotations
# standard imports
import json
import logging
import os
from typing import TYPE_CHECKING, Dict, Union
from cic_types.condiments import MetadataPointer
from cic_types.ext.metadata.signer import Signer
from cic_types.processor import generate_metadata_pointer
if TYPE_CHECKING:
from cic.cmd.arg import CmdCtrl
from cic.http import HTTPSession
# local imports
# external imports
logg = logging.getLogger(__file__)
class Metadata:
"""
:cvar base_url: The base url or the metadata server.
:type base_url: str
"""
base_url = None
ctrl: CmdCtrl = None
class MetadataRequestsHandler(Metadata):
def __init__(
self,
cic_type: MetadataPointer,
identifier: bytes,
engine: str = "pgp",
):
""""""
logg.debug(f"ctrl: {self.ctrl}")
self.opener: HTTPSession = self.ctrl.remote_openers["meta"]
self.cic_type = cic_type
self.engine = engine
self.headers = {"X-CIC-AUTOMERGE": "server", "Content-Type": "application/json"}
self.identifier = identifier
if cic_type == MetadataPointer.NONE:
self.metadata_pointer = identifier.hex()
else:
self.metadata_pointer = generate_metadata_pointer(
identifier=self.identifier, cic_type=self.cic_type
)
if self.base_url:
self.url = os.path.join(self.base_url, self.metadata_pointer)
def create(self, data: Union[Dict, str]):
""""""
data = json.dumps(data).encode("utf-8")
result = self.opener.open(
method="POST", url=self.url, data=data, headers=self.headers
)
logg.debug(
f"url: {self.url}, data: {data}, headers: {self.headers}, result: {result}"
)
metadata = json.loads(result)
return self.edit(data=metadata)
def edit(self, data: Union[Dict, str]):
""""""
cic_meta_signer = Signer()
signature = cic_meta_signer.sign_digest(data=data)
algorithm = cic_meta_signer.get_operational_key().get("algo")
formatted_data = {
"m": json.dumps(data),
"s": {
"engine": self.engine,
"algo": algorithm,
"data": signature,
"digest": data.get("digest"),
},
}
formatted_data = json.dumps(formatted_data).encode("utf-8")
result = self.opener.open(
method="PUT", url=self.url, data=formatted_data, headers=self.headers
)
logg.info(f"signed metadata submission returned: {result}.")
try:
decoded_identifier = self.identifier.decode("utf-8")
except UnicodeDecodeError:
decoded_identifier = self.identifier.hex()
return result
def query(self):
""""""
result = self.opener.open(method="GET", url=self.url)
result_data = json.loads(result)
if not isinstance(result_data, dict):
raise ValueError(f"invalid result data object: {result_data}.")
return result

0
cic/actions/__init__.py Normal file
View File

96
cic/actions/deploy.py Normal file
View File

@ -0,0 +1,96 @@
from __future__ import annotations
# standard imports
import importlib
import logging
import os
from typing import TYPE_CHECKING
# local imports
from cic import Processor, Proof
from cic.attachment import Attachment
from cic.meta import Meta, MetadataWriter
from cic.network import Network
from cic.output import HTTPWriter, KeyedWriterFactory
from cic.token import Token
# external imports
from cic.MetaRequestHandler import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
logg = logging.getLogger(__name__)
if TYPE_CHECKING:
from cic.cmd.arg import CmdCtrl
from cic.actions.types import Options
def init_writers_from_config(config):
w = {
'meta': None,
'attachment': None,
'proof': None,
'ext': None,
}
for v in w.keys():
k = 'CIC_CORE_{}_WRITER'.format(v.upper())
(d, c) = config.get(k).rsplit('.', maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
def deploy(ctrl: CmdCtrl, target: str, contract_directory: str, keystore_directory: str, options: Options):
auth_passphrase=options.auth_passphrase,
auth_key_file_path=options.auth_keyfile_path,
metadata_endpoint=options.metadata_endpoint,
modname = f'cic.ext.{target}'
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(ctrl.config)
output_directory = os.path.join(contract_directory, 'out')
output_writer_path_meta = output_directory
if metadata_endpoint != None:
MetadataRequestsHandler.base_url = metadata_endpoint
MetadataRequestsHandler.ctrl = ctrl
MetadataSigner.gpg_path = '/tmp'
MetadataSigner.key_file_path = auth_key_file_path # This is a p2p key for add data to meta
MetadataSigner.gpg_passphrase = auth_passphrase
writers['proof'] = KeyedWriterFactory(MetadataWriter, None).new
writers['attachment'] = KeyedWriterFactory(None, HTTPWriter).new
writers['meta'] = MetadataWriter
output_writer_path_meta = metadata_endpoint
ct = Token(path=contract_directory)
cm = Meta(path=contract_directory, writer=writers['meta'](path=output_writer_path_meta))
ca = Attachment(path=contract_directory, writer=writers['attachment'](path=output_writer_path_meta))
cp = Proof(path=contract_directory, attachments=ca, writer=writers['proof'](path=output_writer_path_meta))
cn = Network(path=contract_directory)
ca.load()
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = ctrl.config.get('CHAIN_SPEC')
except KeyError:
chain_spec = cn.chain_spec
ctrl.config.add(chain_spec, 'CHAIN_SPEC', exists_ok=True)
logg.debug(f'CHAIN_SPEC config set to {str(chain_spec)}')
(rpc, signer) = cmd_mod.parse_adapter(ctrl.config, keystore_directory)
target_network_reference = cn.resource(target)
chain_spec = cn.chain_spec(target)
logg.debug(f'found reference {target_network_reference["contents"]} chain spec {chain_spec} for target {target}')
c = getattr(cmd_mod, 'new')(chain_spec, target_network_reference['contents'], cp, signer_hint=signer, rpc=rpc, outputs_writer=writers['ext'](path=output_directory))
c.apply_token(ct)
p = Processor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()

28
cic/actions/types.py Normal file
View File

@ -0,0 +1,28 @@
from collections import namedtuple
Contract = namedtuple(
"Contract",
[
"token",
"proof",
"meta",
"attachment",
"network",
],
)
Options = namedtuple(
"Options",
[
"auth_db_path",
"auth_keyfile_path",
"auth_passphrase",
"contract_registry",
"key_account",
"chain_spec",
"rpc_provider",
"metadata_endpoint",
"wallet_keyfile",
"wallet_passphrase",
],
)

View File

@ -1,6 +1,6 @@
# standard imports # standard imports
import os
import logging import logging
import os
# local imports # local imports
from .base import * from .base import *

74
cic/auth.py Normal file
View File

@ -0,0 +1,74 @@
# standard imports
import hashlib
import logging
import os
# external imports
import gnupg
# local imports
from cic.errors import AuthError
logg = logging.getLogger(__name__)
class PGPAuthCrypt:
typ = "gnupg"
def __init__(self, db_dir, auth_key, pgp_dir=None):
self.db_dir = db_dir
try:
bytes.fromhex(auth_key)
except TypeError:
raise AuthError(f"invalid key {auth_key}") from TypeError
except ValueError:
raise AuthError(f"invalid key {auth_key}") from ValueError
self.auth_key = auth_key
self.gpg = gnupg.GPG(gnupghome=pgp_dir)
self.secret = None
self.__passphrase = None
def get_secret(self, passphrase=""):
if passphrase is None:
passphrase = ""
p = os.path.join(self.db_dir, ".secret")
try:
f = open(p, "rb")
except FileNotFoundError:
h = hashlib.sha256()
h.update(bytes.fromhex(self.auth_key))
h.update(passphrase.encode("utf-8"))
z = h.digest()
secret = self.gpg.encrypt(z, [self.auth_key], always_trust=True)
if not secret.ok:
raise AuthError(f"could not encrypt secret for {self.auth_key}") from FileNotFoundError
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
f = open(p, "wb")
f.write(secret.data)
f.close()
f = open(p, "rb")
secret = self.gpg.decrypt_file(f, passphrase=passphrase)
if not secret.ok:
raise AuthError("could not decrypt encryption secret. wrong password?")
f.close()
self.secret = secret.data
self.__passphrase = passphrase
def get_passphrase(self):
return self.__passphrase
def fingerprint(self):
return self.auth_key
def sign(self, plaintext, encoding, passphrase="", detach=True):
r = self.gpg.sign(plaintext, passphrase=passphrase, detach=detach)
if len(r.data) == 0:
raise AuthError("signing failed: " + r.status)
if encoding == "base64":
r = r.data
return r

View File

@ -0,0 +1 @@
from cic.cmd.arg import CmdCtrl

221
cic/cmd/arg.py Normal file
View File

@ -0,0 +1,221 @@
# standard imports
import importlib
import logging
import os
import sys
# external imports
import chainlib.eth.cli
import cic.cmd.easy as cmd_easy
import cic.cmd.export as cmd_export
import cic.cmd.ext as cmd_ext
import cic.cmd.init as cmd_init
import cic.cmd.show as cmd_show
from chainlib.chain import ChainSpec
from cic.auth import PGPAuthCrypt
from cic.crypt.aes import AESCTREncrypt
from cic.http import HTTPSession, PGPClientSession
# local imports
from cic.notify import NotifyWriter
notifier = NotifyWriter()
logg = logging.getLogger(__name__)
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, "..", "data")
base_config_dir = os.path.join(data_dir, "config")
class NullWriter:
def notify(self, v):
pass
def ouch(self, v):
pass
def write(self, v):
sys.stdout.write(str(v))
class CmdCtrl:
__cmd_alias = {
"u": "user",
"t": "tag",
}
__auth_for = [
"user",
]
def __init__(self, *_args, argv=None, _description=None, logger=None, **_kwargs):
self.args(argv)
self.logging(logger)
self.module()
self.load_config()
self.notifier()
self.auth()
self.blockchain()
self.remote_openers = {}
if self.get("META_URL") is not None:
auth_client_session = PGPClientSession(self.__auth)
self.remote_openers["meta"] = HTTPSession(
self.get("META_URL"),
auth=auth_client_session,
origin=self.config.get("META_HTTP_ORIGIN"),
)
def blockchain(self):
self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC"))
self.rpc = chainlib.eth.cli.Rpc()
self.__conn = self.rpc.connect_by_config(self.config)
def args(self, argv):
self.argparser = chainlib.eth.cli.ArgumentParser(
chainlib.eth.cli.argflag_std_read
)
sub = self.argparser.add_subparsers()
sub.dest = "command"
sub_init = sub.add_parser("init", help="initialize new cic data directory")
cmd_init.process_args(sub_init)
sub_show = sub.add_parser(
"show", help="display summary of current state of cic data directory"
)
cmd_show.process_args(sub_show)
sub_export = sub.add_parser(
"export", help="export cic data directory state to a specified target"
)
cmd_export.process_args(sub_export)
sub_ext = sub.add_parser("ext", help="extension helpers")
cmd_ext.process_args(sub_ext)
sub_easy = sub.add_parser("easy", help="Easy Mode Contract Deployment")
cmd_easy.process_args(sub_easy)
self.cmd_args = self.argparser.parse_args(argv)
def module(self):
self.cmd_string = self.cmd_args.command
cmd_string_translate = self.__cmd_alias.get(self.cmd_string)
if cmd_string_translate is not None:
self.cmd_string = cmd_string_translate
if self.cmd_string is None:
self.cmd_string = "none"
self.argparser.print_help()
exit(1)
modname = f"cic.cmd.{self.cmd_string}"
self.logger.debug(f"using module {modname}")
self.cmd_mod = importlib.import_module(modname)
def logging(self, logger):
self.logger = logger
if self.logger is None:
self.logger = logging.getLogger()
if self.cmd_args.vv:
self.logger.setLevel(logging.DEBUG)
elif self.cmd_args.v:
self.logger.setLevel(logging.INFO)
def load_config(self):
override_dir = self.cmd_args.config
if override_dir is None:
p = os.environ.get("HOME")
if p is not None:
p = os.path.join(p, ".config", "cic", "cli")
try:
os.stat(p)
override_dir = p
logg.info(
f"applying user config override from standard location: {p}"
)
except FileNotFoundError:
pass
extra_args = self.cmd_mod.extra_args()
self.config = chainlib.eth.cli.Config.from_args(
self.cmd_args,
base_config_dir=base_config_dir,
extra_args=extra_args,
default_config_dir=override_dir,
)
self.config.add(False, "_SEQ")
self.config.censor("AUTH_PASSPHRASE")
self.logger.debug(f"loaded config:\n{self.config}")
def auth(self):
typ = self.get("AUTH_TYPE")
if typ != "gnupg":
raise NotImplementedError("Valid aut implementations are: gnupg")
default_auth_db_path = None
if os.environ.get("HOME") is not None:
default_auth_db_path = os.path.join(
os.environ["HOME"], ".local/share/cic/clicada"
)
auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path)
self.__auth = PGPAuthCrypt(
auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH")
)
self.__auth.get_secret(self.get("AUTH_PASSPHRASE"))
self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret)
logg.debug(f"loaded auth: {self.__auth}")
logg.debug(f"AUTH_PASSPHRASE: {self.get('AUTH_PASSPHRASE')}")
logg.debug(f"AUTH_KEY: {self.get('AUTH_KEY')}")
logg.debug(f"AUTH_DB_PATH: {self.get('AUTH_DB_PATH')}")
logg.debug(f"AUTH_KEYRING_PATH: {self.get('AUTH_KEYRING_PATH')}")
def get(self, k, default=None):
r = self.config.get(k, default)
if k in [
"_FORCE",
]:
if r is None:
return False
return self.config.true(k)
return r
def chain(self):
return self.chain_spec
def conn(self):
return self.__conn
def execute(self):
self.cmd_mod.execute(self)
def opener(self, k):
return self.remote_openers[k]
def notifier(self):
if logg.root.level >= logging.WARNING:
logging.disable()
self.writer = notifier
else:
self.writer = NullWriter()
def notify(self, v):
self.writer.notify(v)
def ouch(self, v):
self.writer.ouch(v)
print()
def write(self, v):
self.writer.write("")
self.writer.write(v)
print()

412
cic/cmd/easy.py Normal file
View File

@ -0,0 +1,412 @@
from __future__ import annotations
# standard import
import importlib
import json
import logging
import os
from typing import TYPE_CHECKING
import requests
# external imports
from chainlib.chain import ChainSpec
# local imports
from cic import Proof
from cic.actions.deploy import deploy
from cic.attachment import Attachment
from cic.meta import Meta
from cic.network import Network
from cic.token import Token
if TYPE_CHECKING:
from cic.cmd.arg import CmdCtrl
from cic.actions.types import Options, Contract
log = logging.getLogger(__name__)
def process_args(argparser):
argparser.add_argument(
"--skip-gen", action="store_true", default=False, help="Skip Generation"
)
argparser.add_argument(
"--skip-deploy",
action="store_true",
help="Skip Deployment",
)
argparser.add_argument(
"--target",
default="eth",
help="Contract Tech Target (eth)",
)
argparser.add_argument(
"path",
type=str,
help="Path to generate/use contract deployment info",
)
argparser.add_argument(
"-p",
type=str,
help="RPC Provider (http://localhost:8545)",
)
def extra_args():
return {
"path": "_TOKEN_PATH",
"skip_gen": "_TOKEN_SKIP_GEN",
"skip_deploy": "_TOKEN_SKIP_DEPLOY",
"target": "_TOKEN_TARGET",
"p": "RPC_PROVIDER",
}
def validate_args(_args):
pass
CONTRACTS = [
{
"url": "https://gitlab.com/cicnet/eth-erc20/-/raw/master/python/giftable_erc20_token/data/GiftableToken",
"name": "Giftable Token",
},
{
"url": "https://gitlab.com/cicnet/erc20-demurrage-token/-/raw/master/python/erc20_demurrage_token/data/DemurrageTokenSingleNocap",
"name": "Demurrage Token Single No Cap",
},
]
# Download File from Url
def download_file(url: str, directory: str, filename=None) -> (str, bytes):
os.makedirs(directory, exist_ok=True)
filename = filename if filename else url.split("/")[-1]
path = os.path.join(directory, filename)
if not os.path.exists(path):
log.debug(f"Downloading {filename}")
r = requests.get(url, allow_redirects=True)
open(path, "wb").write(r.content)
return path
return path
def get_contract_args(data: list):
for item in data:
if item["type"] == "constructor":
return item["inputs"]
raise Exception("No constructor found in contract")
def print_contract_args(json_path: str):
json_data = json.load(open(json_path, encoding="utf-8"))
print("Contract Args:")
for contract_arg in get_contract_args(json_data):
print(
f"\t{contract_arg.get('name', '<no name>')} - {contract_arg.get('type', '<no type>')}"
)
def select_contract():
print("Contracts:")
print("\t C - Custom (path/url to contract)")
for idx, contract in enumerate(CONTRACTS):
print(f"\t {idx} - {contract['name']}")
val = input("Select contract (C,0,1..): ")
if val.isdigit() and int(val) < len(CONTRACTS):
contract = CONTRACTS[int(val)]
directory = f"./contracts/{contract['name']}"
bin_path = os.path.abspath(download_file(contract["url"] + ".bin", directory))
json_path = download_file(contract["url"] + ".json", directory)
elif val == "C":
possible_bin_location = input("Enter path/url to contract: ")
# possible_bin_location is path
if possible_bin_location[0] == "." or possible_bin_location[0] == "/":
if os.path.exists(possible_bin_location):
bin_path = os.path.abspath(possible_bin_location)
else:
raise Exception(f"File {possible_bin_location} does not exist")
possible_json_path = val.replace(".bin", ".json")
if os.path.exists(possible_json_path):
json_path = possible_json_path
# possible_bin_location is url
else:
bin_path = download_file(possible_bin_location, directory)
else:
print("Invalid selection")
exit(1)
contract_extra_args = []
contract_extra_args_types = []
if os.path.exists(json_path):
json_data = json.load(open(json_path, encoding="utf-8"))
for contract_arg in get_contract_args(json_data):
arg_name = contract_arg.get("name")
arg_type = contract_arg.get("type")
if arg_name not in ["_decimals", "_name", "_symbol"]:
val = input(f"Enter value for {arg_name} ({arg_type}): ")
contract_extra_args.append(val)
if arg_type == "uint128":
contract_extra_args_types.append("uint256")
else:
contract_extra_args_types.append(arg_type)
return {
"bin_path": bin_path,
"json_path": json_path,
"extra_args": contract_extra_args,
"extra_args_types": contract_extra_args_types,
}
def init_token(directory: str, code=""):
contract = select_contract()
code = contract["bin_path"]
contract_extra_args = contract["extra_args"]
contract_extra_args_types = contract["extra_args_types"]
name = input("Enter Token Name (Foo Token): ") or "Foo Token"
symbol = input("Enter Token Symbol (FOO): ") or "FOO"
precision = input("Enter Token Precision (6): ") or 6
supply = input("Enter Token Supply (0): ") or 0
contract_token = Token(
directory,
name=name,
symbol=symbol,
precision=precision,
extra_args=contract_extra_args,
extra_args_types=contract_extra_args_types,
supply=supply,
code=code,
)
contract_token.start()
return contract_token
def init_proof(directory):
description = input("Enter Proof Description (None): ") or None
namespace = input("Enter Proof Namespace (ge): ") or "ge"
issuer = input("Enter Proof Issuer (None): ") or None
contract_proof = Proof(directory, description, namespace, issuer)
contract_proof.start()
return contract_proof
def init_meta(directory):
name = input("Enter Name (None): ") or ""
country_code = input("Enter Country Code (KE): ") or "KE"
location = input("Enter Location (None): ") or ""
adding_contact_info = True
contact = {}
while adding_contact_info:
value = input("Enter contact info (e.g 'phone: +254723522718'): ") or None
if value:
data = value.split(":")
if len(data) != 2:
print("Invalid contact info, you must enter in the format 'key: value'")
continue
contact[data[0].strip()] = data[1].strip()
else:
adding_contact_info = False
contract_meta = Meta(
directory,
name=name,
country_code=country_code,
location=location,
contact=contact,
)
contract_meta.start()
return contract_meta
def init_attachment(directory):
contract_attchment = Attachment(directory)
contract_attchment.start()
input(
f"Please add attachment files to '{os.path.abspath(os.path.join(directory,'attachments'))}' and then press ENTER to continue"
)
contract_attchment.load()
return contract_attchment
def load_contract(directory) -> Contract:
token = Token(path=directory)
proof = Proof(path=directory)
meta = Meta(path=directory)
attachment = Attachment(path=directory)
network = Network(directory)
token.load()
proof.load()
meta.load()
attachment.load()
network.load()
return Contract(
token=token, proof=proof, meta=meta, attachment=attachment, network=network
)
def init_network(
directory,
options: Options,
targets=["eth"],
):
contract_network = Network(directory, targets=targets)
contract_network.start()
for target in targets:
m = importlib.import_module(f"cic.ext.{target}.start")
m.extension_start(
contract_network,
registry_address=options.contract_registry,
chain_spec=options.chain_spec,
rpc_provider=options.rpc_provider,
key_account_address=options.key_account,
)
contract_network.load()
return contract_network
def generate(directory: str, target: str, options: Options) -> Contract:
if os.path.exists(directory):
contine = input(
"Directory already exists, Would you like to delete it? (y/n): "
)
if contine.lower() != "y":
print("Exiting")
exit(1)
else:
print(f"Deleted {directory}")
os.system(f"rm -rf {directory}")
os.makedirs(directory)
token = init_token(directory)
proof = init_proof(directory)
meta = init_meta(directory)
attachment = init_attachment(directory)
network = init_network(
directory,
options,
targets=[target],
)
return Contract(
token=token, proof=proof, meta=meta, attachment=attachment, network=network
)
def get_options(ctrl: CmdCtrl) -> Options:
# Defaults
default_contract_registry = ctrl.config.get(
"CIC_REGISTRY_ADDRESS",
"0xcf60ebc445b636a5ab787f9e8bc465a2a3ef8299", # Comes from /home/will/grassroots/cic-staff-installer/var/cic-staff-client/CIC_REGISTRY_ADDRESS
)
default_key_account = ctrl.config.get(
"AUTH_KEY",
"eb3907ecad74a0013c259d5874ae7f22dcbcc95c", # comes from wallet `eth-keyfile -z -d $WALLET_KEY_FILE`
)
# https://meta.grassrootseconomics.net
# https://auth.grassrootseconomics.net Authenticated Meta
default_metadata_endpoint = ctrl.config.get("META_URL", "https://auth.grassecon.net")
# Keyring folder needs to be dumped out as a private key file from $HOME/.config/cic/staff-client/.gnupg
default_wallet_keyfile = ctrl.config.get(
"WALLET_KEY_FILE",
"/home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc",
) # Show possible wallet keys
# Should be an input???
default_wallet_passphrase = ctrl.config.get("WALLET_PASSPHRASE", "merman")
default_chain_spec = ctrl.config.get("CHAIN_SPEC", "evm:byzantium:8996:bloxberg")
default_rpc_provider = ctrl.config.get(
"RPC_PROVIDER", "https://rpc.grassecon.net"
)
contract_registry = (
input(f"Enter Contract Registry ({default_contract_registry}): ")
or default_contract_registry
)
rpc_provider = (
input(f"Enter RPC Provider ({default_rpc_provider}): ") or default_rpc_provider
)
chain_spec = ChainSpec.from_chain_str(
(input(f"Enter ChainSpec ({default_chain_spec}): ") or default_chain_spec)
)
key_account = (
input(f"Enter KeyAccount ({default_key_account}): ") or default_key_account
)
metadata_endpoint = (
input(f"Enter Metadata Endpoint ({default_metadata_endpoint}): ")
or default_metadata_endpoint
)
auth_passphrase = ctrl.config.get(
"AUTH_PASSPHRASE"
)
auth_keyfile_path = ctrl.config.get(
"AUTH_KEYFILE_PATH"
)
auth_db_path = ctrl.config.get("AUTH_DB_PATH")
return Options(
auth_db_path,
auth_keyfile_path,
auth_passphrase,
contract_registry,
key_account,
chain_spec,
rpc_provider,
metadata_endpoint,
default_wallet_keyfile,
default_wallet_passphrase,
)
def print_contract(contract: Contract):
print(f"[cic.header]\nversion = {contract.proof.version()}\n")
print(f"[cic.token]\n{contract.token}")
print(f"[cic.proof]\n{contract.proof}")
print(f"[cic.meta]\n{contract.meta}")
print(f"[cic.attachment]\n{contract.attachment}")
print(f"[cic.network]\n{contract.network}")
def execute(ctrl: CmdCtrl):
directory = ctrl.config.get("_TOKEN_PATH")
target = ctrl.config.get("_TOKEN_TARGET")
skip_gen = ctrl.config.get("_TOKEN_SKIP_GEN")
skip_deploy = ctrl.config.get("_TOKEN_SKIP_DEPLOY")
options = get_options(ctrl)
if not skip_gen:
contract = generate(directory, target, options)
else:
contract = load_contract(directory)
print_contract(contract)
if not skip_deploy:
ready_to_deploy = input("Ready to deploy? (y/n): ")
if ready_to_deploy == "y":
deploy(
ctrl=ctrl,
contract_directory=directory,
options=options,
keystore_directory="/home/will/grassroots/cic-internal-integration/apps/contract-migration/keystore", # Meta Signer meta.ge.net but not auth.ge.net(usumbufu determins if you can even interact with the server) and this ensures data integrity
target=target,
)
print("Deployed")
else:
print("Not deploying")
if __name__ == "__main__":
# execute()
print("Not Implemented")

View File

@ -23,7 +23,7 @@ from cic.meta import (
from cic.attachment import Attachment from cic.attachment import Attachment
from cic.network import Network from cic.network import Network
from cic.token import Token from cic.token import Token
from typing import Optional
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -56,8 +56,9 @@ def init_writers_from_config(config):
return w return w
EArgs = {'target': str, 'directory': str, 'output_directory': str, 'metadata_endpoint': Optional[str], 'y': str}
def execute(config, eargs): def execute(config, eargs: EArgs):
modname = 'cic.ext.{}'.format(eargs.target) modname = 'cic.ext.{}'.format(eargs.target)
cmd_mod = importlib.import_module(modname) cmd_mod = importlib.import_module(modname)
@ -67,7 +68,7 @@ def execute(config, eargs):
if eargs.metadata_endpoint != None: if eargs.metadata_endpoint != None:
MetadataRequestsHandler.base_url = eargs.metadata_endpoint MetadataRequestsHandler.base_url = eargs.metadata_endpoint
MetadataSigner.gpg_path = os.path.join('/tmp') MetadataSigner.gpg_path = os.path.join('/tmp')
MetadataSigner.key_file_path = '/home/lash/src/client/cic/grassrootseconomics/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc' MetadataSigner.key_file_path = '/home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc'
MetadataSigner.gpg_passphrase = 'merman' MetadataSigner.gpg_passphrase = 'merman'
writers['proof'] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new writers['proof'] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers['attachment'] = KeyedWriterFactory(None, HTTPWriter).new writers['attachment'] = KeyedWriterFactory(None, HTTPWriter).new

View File

@ -25,6 +25,6 @@ def execute(config, eargs):
chain_spec = ChainSpec.from_chain_str(eargs.i) chain_spec = ChainSpec.from_chain_str(eargs.i)
m = importlib.import_module('cic.ext.{}.start'.format(eargs.target)) m = importlib.import_module(f'cic.ext.{eargs.target}.start')
m.extension_start(cn, registry_address=eargs.registry, chain_spec=chain_spec, rpc_provider=config.get('RPC_PROVIDER')) m.extension_start(cn, registry_address=eargs.registry, chain_spec=chain_spec, rpc_provider=config.get('RPC_PROVIDER'))

42
cic/crypt/aes.py Normal file
View File

@ -0,0 +1,42 @@
# standard imports
import os
import logging
import hashlib
from Crypto.Cipher import AES
from Crypto.Util import Counter
from .base import Encrypter
logg = logging.getLogger(__name__)
class AESCTREncrypt(Encrypter):
aes_block_size = 1 << 7
counter_bytes = int(128 / 8)
def __init__(self, db_dir, secret):
self.secret = secret
def key_to_iv(self, k):
h = hashlib.sha256()
h.update(k.encode('utf-8'))
h.update(self.secret)
z = h.digest()
return int.from_bytes(z[:self.counter_bytes], 'big')
def encrypt(self, k, v):
iv = self.key_to_iv(k)
ctr = Counter.new(self.aes_block_size, initial_value=iv)
cipher = AES.new(self.secret, AES.MODE_CTR, counter=ctr)
return cipher.encrypt(v)
def decrypt(self, k, v):
iv = self.key_to_iv(k)
ctr = Counter.new(self.aes_block_size, initial_value=iv)
cipher = AES.new(self.secret, AES.MODE_CTR, counter=ctr)
return cipher.decrypt(v)

8
cic/crypt/base.py Normal file
View File

@ -0,0 +1,8 @@
class Encrypter:
def encrypt(self, v):
raise NotImplementedError()
def decrypt(self, v):
raise NotImplementedError()

View File

@ -3,3 +3,21 @@ meta_writer = cic.output.KVWriter
attachment_writer = cic.output.KVWriter attachment_writer = cic.output.KVWriter
proof_writer = cic.output.KVWriter proof_writer = cic.output.KVWriter
ext_writer = cic.output.KVWriter ext_writer = cic.output.KVWriter
[cic]
registry_address = 0xcf60ebc445b636a5ab787f9e8bc465a2a3ef8299
[meta]
url = https://auth.grassecon.net
http_origin =
[auth]
type = gnupg
db_path = /home/will/.local/share/cic/clicada
keyfile_path = /home/will/.config/cic/staff-client/user.asc
keyring_path = /home/will/.config/cic/staff-client/.gnupg
key = CCE2E1D2D0E36ADE0405E2D0995BB21816313BD5
passphrase =

View File

@ -4,5 +4,10 @@
"precision": 0, "precision": 0,
"code": null, "code": null,
"supply": 0, "supply": 0,
"extra": {} "extra": [
{
"arg": "",
"arg_type": ""
}
]
} }

6
cic/errors.py Normal file
View File

@ -0,0 +1,6 @@
class AuthError(Exception):
pass
class MetadataNotFoundError(Exception):
pass

View File

@ -198,7 +198,7 @@ class CICEth(Extension):
writer.write('token_address', self.token_address.encode('utf-8')) writer.write('token_address', self.token_address.encode('utf-8'))
self.add_outputs('token', r) self.add_outputs('token', r)
if self.token_details['supply'] > 0: if int(self.token_details['supply']) > 0:
c = GiftableToken(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.fee_oracle) c = GiftableToken(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle, gas_oracle=self.fee_oracle)
o = c.mint_to(self.token_address, self.resources['token']['key_account'], self.resources['token']['key_account'], self.token_details['supply']) o = c.mint_to(self.token_address, self.resources['token']['key_account'], self.resources['token']['key_account'], self.token_details['supply'])
r = None r = None

View File

@ -1,6 +1,6 @@
# external imports # external imports
from cic_eth_registry import CICRegistry
from chainlib.eth.connection import RPCConnection from chainlib.eth.connection import RPCConnection
from cic_eth_registry import CICRegistry
def extension_start(network, *args, **kwargs): def extension_start(network, *args, **kwargs):
@ -9,18 +9,26 @@ def extension_start(network, *args, **kwargs):
:param network: Network object to read and write settings from :param network: Network object to read and write settings from
:type network: cic.network.Network :type network: cic.network.Network
""" """
CICRegistry.address = kwargs['registry_address'] CICRegistry.address = kwargs["registry_address"]
RPCConnection.register_location(kwargs['rpc_provider'], kwargs['chain_spec']) key_account_address = kwargs["key_account_address"] or ""
conn = RPCConnection.connect(kwargs['chain_spec'])
registry = CICRegistry(kwargs['chain_spec'], conn) RPCConnection.register_location(kwargs["rpc_provider"], kwargs["chain_spec"])
conn = RPCConnection.connect(kwargs["chain_spec"])
address_declarator = registry.by_name('AddressDeclarator') registry = CICRegistry(kwargs["chain_spec"], conn)
network.resource_set('eth', 'address_declarator', address_declarator)
token_index = registry.by_name('TokenRegistry') address_declarator = registry.by_name("AddressDeclarator")
network.resource_set('eth', 'token_index', token_index) network.resource_set(
"eth", "address_declarator", address_declarator, key_account=key_account_address
)
network.set('eth', kwargs['chain_spec']) token_index = registry.by_name("TokenRegistry")
network.resource_set(
"eth", "token_index", token_index, key_account=key_account_address
)
network.resource_set("eth", "token", None, key_account=key_account_address)
network.set("eth", kwargs["chain_spec"])
network.save() network.save()

View File

@ -1,11 +1,13 @@
# standard imports # standard imports
import logging import logging
from typing import TYPE_CHECKING
# external imports # external imports
from hexathon import valid as valid_hex from hexathon import valid as valid_hex
# local imports # local imports
from cic.output import StdoutWriter from cic.output import StdoutWriter
from cic.token import Token
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -26,7 +28,16 @@ class Extension:
:param writer: Writer interface receiving the output of the processor :param writer: Writer interface receiving the output of the processor
:type writer: cic.output.OutputWriter :type writer: cic.output.OutputWriter
""" """
def __init__(self, chain_spec, resources, proof, signer=None, rpc=None, outputs_writer=StdoutWriter()):
def __init__(
self,
chain_spec,
resources,
proof,
signer=None,
rpc=None,
outputs_writer=StdoutWriter(),
):
self.resources = resources self.resources = resources
self.proof = proof self.proof = proof
self.chain_spec = chain_spec self.chain_spec = chain_spec
@ -38,9 +49,8 @@ class Extension:
self.outputs = [] self.outputs = []
self.outputs_writer = outputs_writer self.outputs_writer = outputs_writer
# TODO: apply / prepare token can be factored out # TODO: apply / prepare token can be factored out
def apply_token(self, token): def apply_token(self, token: Token):
"""Initialize extension with token data from settings. """Initialize extension with token data from settings.
:param token: Token object :param token: Token object
@ -48,10 +58,27 @@ class Extension:
:rtype: dict :rtype: dict
:returns: Token data state of extension after load :returns: Token data state of extension after load
""" """
return self.prepare_token(token.name, token.symbol, token.precision, token.code, token.supply) return self.prepare_token(
token.name,
token.symbol,
token.precision,
token.code,
token.supply,
token.extra_args,
token.extra_args_types,
)
def prepare_token(self, name, symbol, precision, code, supply, extra=[], extra_types=[], positions=None): def prepare_token(
self,
name,
symbol,
precision,
code,
supply,
extra=[],
extra_types=[],
positions=None,
):
"""Initialize extension token data. """Initialize extension token data.
:param name: Token name :param name: Token name
@ -65,7 +92,7 @@ class Extension:
:param supply: Token supply (in smallest precision units) :param supply: Token supply (in smallest precision units)
:type supply: int :type supply: int
:param extra: Extra parameters to pass to token application constructor :param extra: Extra parameters to pass to token application constructor
:type extra: list :type extra: list
:param extra_types: Type specifications for extra parameters :param extra_types: Type specifications for extra parameters
:type extra_types: list :type extra_types: list
:param positions: Sequence of parameter indices to pass to application constructor :param positions: Sequence of parameter indices to pass to application constructor
@ -74,24 +101,22 @@ class Extension:
:returns: Token data state of extension after load :returns: Token data state of extension after load
""" """
self.token_details = { self.token_details = {
'name': name, "name": name,
'symbol': symbol, "symbol": symbol,
'precision': precision, "precision": precision,
'code': code, "code": code,
'supply': supply, "supply": supply,
'extra': extra, "extra": extra,
'extra_types': extra_types, "extra_types": extra_types,
'positions': positions, "positions": positions,
} }
logg.debug(f"token details: {self.token_details}")
return self.token_details return self.token_details
def prepare_extension(self): def prepare_extension(self):
"""Prepare extension for publishing (noop) """Prepare extension for publishing (noop)"""
"""
pass pass
def parse_code_as_file(self, v): def parse_code_as_file(self, v):
"""Helper method to load application bytecode from file into extensions token data state. """Helper method to load application bytecode from file into extensions token data state.
@ -101,18 +126,17 @@ class Extension:
:type v: str :type v: str
""" """
try: try:
f = open(v, 'r') f = open(v, "r")
r = f.read() r = f.read()
f.close() f.close()
self.parse_code_as_hex(r) self.parse_code_as_hex(r)
except FileNotFoundError: except FileNotFoundError as e:
logg.debug('could not parse code as file: {}'.format(e)) logg.debug("could not parse code as file: {}".format(e))
pass pass
except IsADirectoryError: except IsADirectoryError as e:
logg.debug('could not parse code as file: {}'.format(e)) logg.debug("could not parse code as file: {}".format(e))
pass pass
def parse_code_as_hex(self, v): def parse_code_as_hex(self, v):
"""Helper method to load application bytecode from hex data into extension token data state. """Helper method to load application bytecode from hex data into extension token data state.
@ -121,13 +145,12 @@ class Extension:
:param v: Bytecode as hex :param v: Bytecode as hex
:type v: str :type v: str
""" """
try: try:
self.token_code = valid_hex(v) self.token_code = valid_hex(v)
except ValueError as e: except ValueError as e:
logg.debug('could not parse code as hex: {}'.format(e)) logg.debug("could not parse code as hex: {}".format(e))
pass pass
def load_code(self, hint=None): def load_code(self, hint=None):
"""Attempt to load token application bytecode using token settings. """Attempt to load token application bytecode using token settings.
@ -136,24 +159,23 @@ class Extension:
:rtype: str (hex) :rtype: str (hex)
:return: Bytecode loaded into extension token data state :return: Bytecode loaded into extension token data state
""" """
code = self.token_details['code'] code = self.token_details["code"]
if hint == 'hex': if hint == "hex":
self.token_code = valid_hex(code) self.token_code = valid_hex(code)
for m in [ for m in [
self.parse_code_as_hex, self.parse_code_as_hex,
self.parse_code_as_file, self.parse_code_as_file,
]: ]:
m(code) m(code)
if self.token_code != None: if self.token_code != None:
break break
if self.token_code == None: if self.token_code == None:
raise RuntimeError('could not successfully parse token code') raise RuntimeError("could not successfully parse token code")
return self.token_code return self.token_code
def process(self, writer=None): def process(self, writer=None):
"""Adapter used by Processor to process the extensions implementing the Extension base class. """Adapter used by Processor to process the extensions implementing the Extension base class.
@ -167,26 +189,26 @@ class Extension:
if writer == None: if writer == None:
writer = self.outputs_writer writer = self.outputs_writer
tasks = [] tasks = []
self.token_address = self.resources['token']['reference'] self.token_address = self.resources["token"]["reference"]
# TODO: get token details when token address is not none # TODO: get token details when token address is not none
if self.token_address == None: if self.token_address == None:
if self.token_details['code'] == None: if self.token_details["code"] == None:
raise RuntimeError('neither token address nor token code has been set') raise RuntimeError("neither token address nor token code has been set")
self.load_code() self.load_code()
tasks.append('token') tasks.append("token")
for k in self.resources.keys(): for k in self.resources.keys():
if k == 'token': if k == "token":
continue continue
if self.resources[k]['reference'] != None: if self.resources[k]["reference"] != None:
tasks.append(k) tasks.append(k)
self.prepare_extension() self.prepare_extension()
for task in tasks: for task in tasks:
logg.debug('extension adapter process {}'.format(task)) logg.debug("extension adapter process {}".format(task))
r = getattr(self, 'process_' + task)(writer=writer) r = getattr(self, "process_" + task)(writer=writer)
return (self.token_address, self.token_details.get('symbol')) return (self.token_address, self.token_details.get("symbol"))

111
cic/http.py Normal file
View File

@ -0,0 +1,111 @@
# standard imports
import hashlib
import logging
import os
import ssl
import urllib.parse
from http.client import HTTPResponse
from socket import getservbyname
from urllib.request import HTTPSHandler
# external imports
from usumbufu.client.base import BaseTokenStore, ClientSession
from usumbufu.client.bearer import BearerClientSession
from usumbufu.client.hoba import HobaClientSession
logg = logging.getLogger(__name__)
class PGPClientSession(HobaClientSession):
alg = "969"
def __init__(self, auth):
self.auth = auth
self.origin = None
self.fingerprint = self.auth.fingerprint()
def sign_auth_challenge(self, plaintext, hoba, encoding):
passphrase = self.auth.get_passphrase()
r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True)
hoba.signature = r
return str(hoba)
def __str__(self):
return "clicada hoba/pgp auth"
def __repr__(self):
return "clicada hoba/pgp auth"
class HTTPSession:
token_dir = f"/run/user/{os.getuid()}/clicada/usumbufu/.token"
def __init__(self, url, auth=None, origin=None):
self.base_url = url
url_parts = urllib.parse.urlsplit(self.base_url)
url_parts_origin_host = url_parts[1].split(":")
host = url_parts_origin_host[0]
try:
host = host + ":" + url_parts_origin_host[1]
except IndexError:
host = host + ":" + str(getservbyname(url_parts[0]))
logg.info(
f"changed origin with missing port number from {url_parts[1]} to {host}"
)
url_parts_origin = (
url_parts[0],
host,
"",
"",
"",
)
self.origin = origin
if self.origin is None:
self.origin = urllib.parse.urlunsplit(url_parts_origin)
else:
logg.debug(f"overriding http origin for {url} with {self.origin}")
h = hashlib.sha256()
h.update(self.base_url.encode("utf-8"))
z = h.digest()
token_store_dir = os.path.join(self.token_dir, z.hex())
os.makedirs(token_store_dir, exist_ok=True)
self.token_store = BaseTokenStore(path=token_store_dir)
logg.debug(
f"token store: \n{self.token_store}\n origin: {self.origin}\n token_store_dir: {token_store_dir}\n"
)
self.session = ClientSession(self.origin, token_store=self.token_store)
bearer_handler = BearerClientSession(self.origin, token_store=self.token_store)
self.session.add_subhandler(bearer_handler)
if auth is not None:
auth.origin = self.origin
self.session.add_subhandler(auth)
ctx = ssl.create_default_context()
ctx.load_verify_locations(
capath="/home/will/grassroots/cic-staff-installer/keys/ge.ca"
)
https_handler = HTTPSHandler(context=ctx)
self.session.add_parent(parent=https_handler)
self.opener = urllib.request.build_opener(self.session)
def open(self, url, method=None, data: bytes = None, headers=None):
logg.debug(f"headers: {headers}")
logg.debug(f"token store: \n{self.token_store}\n origin: {self.origin}")
req = urllib.request.Request(url=url, data=data, headers=headers, method=method)
logg.debug(f"open {url} with opener {self}")
logg.debug(req.get_full_url())
logg.debug(f"handlers {self.opener.handlers}")
response: HTTPResponse = self.opener.open(req)
status = response.getcode()
logg.debug(f"{url} returned {status}")
return response.read().decode("utf-8")
def __str__(self):
return str(self.session)

View File

@ -1,21 +1,28 @@
from __future__ import annotations
# standard imports # standard imports
import os import base64
import json import json
import logging import logging
import base64 import os
# types
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from cic.cmd.arg import CmdCtrl
# external imports # external imports
from cic_types import MetadataPointer from cic_types import MetadataPointer
from cic_types.processor import generate_metadata_pointer from cic_types.processor import generate_metadata_pointer
from cic_types.ext.metadata import MetadataRequestsHandler
from hexathon import strip_0x from hexathon import strip_0x
# local imports from cic.MetaRequestHandler import MetadataRequestsHandler
from .base import (
Data,
data_dir,
)
from cic.output import OutputWriter from cic.output import OutputWriter
from cic.utils import object_to_str
# local imports
from .base import Data, data_dir
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -29,92 +36,94 @@ class Meta(Data):
:param writer: Writer interface receiving the output of the processor :param writer: Writer interface receiving the output of the processor
:type writer: cic.output.OutputWriter :type writer: cic.output.OutputWriter
""" """
def __init__(self, path='.', writer=None):
def __init__(
self, path=".", writer=None, name="", location="", country_code="", contact={}
):
super(Meta, self).__init__() super(Meta, self).__init__()
self.name = None self.name = name
self.contact = {} self.contact = contact
self.country_code = country_code
self.location = location
self.path = path self.path = path
self.writer = writer self.writer = writer
self.meta_path = os.path.join(self.path, 'meta.json') self.meta_path = os.path.join(self.path, "meta.json")
def load(self): def load(self):
"""Load metadata from settings. """Load metadata from settings."""
"""
super(Meta, self).load() super(Meta, self).load()
f = open(self.meta_path, 'r') f = open(self.meta_path, "r", encoding="utf-8")
o = json.load(f) o = json.load(f)
f.close() f.close()
self.name = o['name'] self.name = o["name"]
self.contact = o['contact'] self.contact = o["contact"]
self.country_code = o["country_code"]
self.location = o["location"]
self.inited = True self.inited = True
def start(self): def start(self):
"""Initialize metadata settings from template. """Initialize metadata settings from template."""
"""
super(Meta, self).start() super(Meta, self).start()
meta_template_file_path = os.path.join(data_dir, 'meta_template_v{}.json'.format(self.version())) meta_template_file_path = os.path.join(
data_dir, f"meta_template_v{self.version()}.json"
f = open(meta_template_file_path) )
f = open(meta_template_file_path, encoding="utf-8")
o = json.load(f) o = json.load(f)
f.close() f.close()
f = open(self.meta_path, 'w') o["name"] = self.name
o["contact"] = self.contact
o["country_code"] = self.country_code
o["location"] = self.location
f = open(self.meta_path, "w", encoding="utf-8")
json.dump(o, f, sort_keys=True, indent="\t") json.dump(o, f, sort_keys=True, indent="\t")
f.close() f.close()
def reference(self, token_address): def reference(self, token_address):
"""Calculate the mutable reference for the token metadata. """Calculate the mutable reference for the token metadata."""
"""
token_address_bytes = bytes.fromhex(strip_0x(token_address)) token_address_bytes = bytes.fromhex(strip_0x(token_address))
return generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_META) return generate_metadata_pointer(
token_address_bytes, MetadataPointer.TOKEN_META
)
def asdict(self): def asdict(self):
"""Output proof state to dict. """Output proof state to dict."""
"""
return { return {
'name': self.name, "name": self.name,
'contact': self.contact, "country_code": self.country_code,
} "location": self.location,
"contact": self.contact,
}
def process(self, token_address=None, token_symbol=None, writer=None): def process(self, token_address=None, token_symbol=None, writer=None):
"""Serialize and publish metadata. """Serialize and publish metadata.
See cic.processor.Processor.process See cic.processor.Processor.process
""" """
if writer == None: if writer is None:
writer = self.writer writer = self.writer
v = json.dumps(self.asdict()) v = json.dumps(self.asdict())
token_address_bytes = bytes.fromhex(strip_0x(token_address)) token_address_bytes = bytes.fromhex(strip_0x(token_address))
k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_META) k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_META)
writer.write(k, v.encode('utf-8')) writer.write(k, v.encode("utf-8"))
token_symbol_bytes = token_symbol.encode('utf-8') token_symbol_bytes = token_symbol.encode("utf-8")
k = generate_metadata_pointer(token_symbol_bytes, MetadataPointer.TOKEN_META_SYMBOL) k = generate_metadata_pointer(
writer.write(k, v.encode('utf-8')) token_symbol_bytes, MetadataPointer.TOKEN_META_SYMBOL
)
writer.write(k, v.encode("utf-8"))
return (k, v) return (k, v)
def __str__(self): def __str__(self):
s = "contact.name = {}\n".format(self.name) return object_to_str(self, ["name", "contact", "country_code", "location"])
for k in self.contact.keys():
if self.contact[k] == '':
continue
s += "contact.{} = {}\n".format(k.lower(), self.contact[k])
return s
class MetadataWriter(OutputWriter): class MetadataWriter(OutputWriter):
@ -128,14 +137,13 @@ class MetadataWriter(OutputWriter):
def write(self, k, v): def write(self, k, v):
rq = MetadataRequestsHandler(MetadataPointer.NONE, bytes.fromhex(k)) rq = MetadataRequestsHandler(MetadataPointer.NONE, bytes.fromhex(k))
try: try:
v = v.decode('utf-8') v = v.decode("utf-8")
v = json.loads(v) v = json.loads(v)
logg.debug('metadatawriter bindecode {} {}'.format(k, v)) logg.debug(f"metadatawriter bindecode {k} {v}")
except UnicodeDecodeError: except UnicodeDecodeError:
v = base64.b64encode(v).decode('utf-8') v = base64.b64encode(v).decode("utf-8")
v = json.loads(json.dumps(v)) v = json.loads(json.dumps(v))
logg.debug('metadatawriter b64encode {} {}'.format(k, v)) logg.debug(f"metadatawriter b64encode {k} {v}")
r = rq.create(v) r = rq.create(v)
logg.info('metadata submitted at {}'.format(k)) logg.info(f"metadata submitted at {k}")
return r return r

View File

@ -54,7 +54,7 @@ class Network(Data):
""" """
super(Network, self).load() super(Network, self).load()
network_template_file_path = os.path.join(data_dir, 'network_template_v{}.json'.format(self.version())) network_template_file_path = os.path.join(data_dir, f'network_template_v{self.version()}.json')
f = open(network_template_file_path) f = open(network_template_file_path)
o_part = json.load(f) o_part = json.load(f)
@ -138,11 +138,11 @@ class Network(Data):
def __str__(self): def __str__(self):
s = '' s = ''
for k in self.resources.keys(): for resource in self.resources.keys():
for kk in self.resources[k]['contents'].keys(): for content_key in self.resources[resource]['contents'].keys():
v = self.resources[k]['contents'][kk] content_value = self.resources[resource]['contents'][content_key]
if v == None: if content_value == None:
v = '' content_value = ''
s += '{}.{} = {}\n'.format(k, kk, v) s += f'{resource}.{content_key} = {content_value}\n'
return s return s

33
cic/notify.py Normal file
View File

@ -0,0 +1,33 @@
# standard imports
import os
import sys
import shutil
class NotifyWriter:
def __init__(self, writer=sys.stdout):
(c, r) = shutil.get_terminal_size()
self.cols = c
self.fmt = "\r{:" + "<{}".format(c) + "}"
self.w = writer
self.notify_max = self.cols - 4
def notify(self, v):
if len(v) > self.notify_max:
v = v[:self.notify_max]
self.write('\x1b[0;36m... ' + v + '\x1b[0;39m')
def ouch(self, v):
if len(v) > self.notify_max:
v = v[:self.notify_max]
self.write('\x1b[0;91m!!! ' + v + '\x1b[0;39m')
def write(self, v):
s = str(v)
if len(s) > self.cols:
s = s[:self.cols]
self.w.write(self.fmt.format(s))

View File

@ -4,6 +4,7 @@ import sys
import logging import logging
import urllib.request import urllib.request
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -12,7 +13,7 @@ class OutputWriter:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
pass pass
def write(self, k, v): def write(self, k, v, **kwargs):
raise NotImplementedError() raise NotImplementedError()
@ -51,7 +52,7 @@ class HTTPWriter(OutputWriter):
path = self.path path = self.path
if k != None: if k != None:
path = os.path.join(path, k) path = os.path.join(path, k)
logg.debug('http writer post {}'.format(path)) logg.debug(f'http writer post {path} \n key: {k}, value: {v}')
rq = urllib.request.Request(path, method='POST', data=v) rq = urllib.request.Request(path, method='POST', data=v)
r = urllib.request.urlopen(rq) r = urllib.request.urlopen(rq)
logg.info('http writer submitted at {}'.format(r.read())) logg.info('http writer submitted at {}'.format(r.read()))

View File

@ -18,18 +18,25 @@ class Processor:
:param extensions: Extension contexts to publish to :param extensions: Extension contexts to publish to
:type extensions: list of cic.extension.Extension :type extensions: list of cic.extension.Extension
""" """
def __init__(self, proof=None, attachment=None, metadata=None, outputs_writer=None, extensions=[]):
def __init__(
self,
proof=None,
attachment=None,
metadata=None,
outputs_writer=None,
extensions=[],
):
self.token_address = None self.token_address = None
self.extensions = extensions self.extensions = extensions
self.cores = { self.cores = {
'metadata': metadata, "metadata": metadata,
'attachment': attachment, "attachment": attachment,
'proof': proof, "proof": proof,
} }
self.outputs = [] self.outputs = []
self.__outputs_writer = outputs_writer self.__outputs_writer = outputs_writer
def writer(self): def writer(self):
"""Return the writer instance that the process is using. """Return the writer instance that the process is using.
@ -38,7 +45,6 @@ class Processor:
""" """
return self.__outputs_writer return self.__outputs_writer
def get_outputs(self): def get_outputs(self):
"""Return all written outputs. """Return all written outputs.
@ -53,7 +59,6 @@ class Processor:
outputs += self.outputs outputs += self.outputs
return outputs return outputs
def process(self, writer=None): def process(self, writer=None):
"""Serializes and publishes all token data. """Serializes and publishes all token data.
@ -66,18 +71,23 @@ class Processor:
""" """
tasks = [ tasks = [
'attachment', "attachment",
'proof', "proof",
'metadata', "metadata",
] ]
for ext in self.extensions: for ext in self.extensions:
(token_address, token_symbol) = ext.process() # (token_address, token_symbol) = ext.process()
token_address="1a4b2d1B564456f07d5920FeEcdF86077F7bba1E"
token_symbol="WILLY"
for task in tasks: for task in tasks:
a = self.cores.get(task) a = self.cores.get(task)
if a == None: if a == None:
logg.debug('skipping missing task receiver "{}"'.format(task)) logg.debug('skipping missing task receiver "{}"'.format(task))
continue continue
v = a.process(token_address=token_address, token_symbol=token_symbol, writer=self.__outputs_writer) v = a.process(
token_address=token_address,
token_symbol=token_symbol,
writer=self.__outputs_writer,
)
self.outputs.append(v) self.outputs.append(v)

View File

@ -1,19 +1,19 @@
# standard imports # standard imports
import os
import json import json
import logging import logging
import os
import tempfile import tempfile
import cbor2
from cic_types import MetadataPointer
from cic_types.processor import generate_metadata_pointer
# external imports # external imports
from hexathon import strip_0x from hexathon import strip_0x
from cic_types import MetadataPointer
from cic_types.processor import generate_metadata_pointer from cic.utils import object_to_str
from cic_types.ext.metadata import MetadataRequestsHandler
# local imports # local imports
from .base import * from .base import *
from cic.output import OutputWriter
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -33,34 +33,40 @@ class Proof(Data):
:type writer: cic.output.OutputWriter :type writer: cic.output.OutputWriter
""" """
def __init__(self, path='.', attachments=None, writer=None): def __init__(
self,
path=".",
description=None,
namespace="ge",
issuer=None,
attachments=None,
writer=None,
):
super(Proof, self).__init__() super(Proof, self).__init__()
self.proofs = [] self.proofs = []
self.namespace = 'ge' self.namespace = namespace
self.description = None self.description = description
self.issuer = None self.issuer = issuer
self.path = path self.path = path
self.writer = writer self.writer = writer
self.extra_attachments = attachments self.extra_attachments = attachments
self.attachments = {} self.attachments = {}
self.proof_path = os.path.join(self.path, 'proof.json') self.proof_path = os.path.join(self.path, "proof.json")
self.temp_proof_path = tempfile.mkstemp()[1] self.temp_proof_path = tempfile.mkstemp()[1]
def load(self): def load(self):
"""Load proof data from settings. """Load proof data from settings."""
"""
super(Proof, self).load() super(Proof, self).load()
f = open(self.proof_path, 'r') f = open(self.proof_path, "r")
o = json.load(f) o = json.load(f)
f.close() f.close()
self.set_version(o['version']) self.set_version(o["version"])
self.description = o['description'] self.description = o["description"]
self.namespace = o['namespace'] self.namespace = o["namespace"]
self.issuer = o['issuer'] self.issuer = o["issuer"]
self.proofs = o['proofs'] self.proofs = o["proofs"]
if self.extra_attachments != None: if self.extra_attachments != None:
a = self.extra_attachments.asdict() a = self.extra_attachments.asdict()
@ -72,34 +78,33 @@ class Proof(Data):
self.inited = True self.inited = True
def start(self): def start(self):
"""Initialize proof settings from template. """Initialize proof settings from template."""
"""
super(Proof, self).start() super(Proof, self).start()
proof_template_file_path = os.path.join(data_dir, 'proof_template_v{}.json'.format(self.version())) proof_template_file_path = os.path.join(
data_dir, "proof_template_v{}.json".format(self.version())
)
f = open(proof_template_file_path) f = open(proof_template_file_path)
o = json.load(f) o = json.load(f)
f.close() f.close()
o["issuer"] = self.issuer
f = open(self.proof_path, 'w') o["description"] = self.description
o["namespace"] = self.namespace
f = open(self.proof_path, "w")
json.dump(o, f, sort_keys=True, indent="\t") json.dump(o, f, sort_keys=True, indent="\t")
f.close() f.close()
def asdict(self): def asdict(self):
"""Output proof state to dict. """Output proof state to dict."""
"""
return { return {
'version': self.version(), "version": self.version(),
'namespace': self.namespace, "namespace": self.namespace,
'description': self.description, "description": self.description,
'issuer': self.issuer, "issuer": self.issuer,
'proofs': self.proofs, "proofs": self.proofs,
} }
# TODO: the efficiency of this method could probably be improved. # TODO: the efficiency of this method could probably be improved.
def __get_ordered_hashes(self): def __get_ordered_hashes(self):
@ -108,34 +113,30 @@ class Proof(Data):
return ks return ks
# def get(self):
# def get(self): # hsh = self.hash(b).hex()
# hsh = self.hash(b).hex() # self.attachments[hsh] = self.temp_proof_path
# self.attachments[hsh] = self.temp_proof_path # logg.debug('cbor of {} is {} hashes to {}'.format(v, b.hex(), hsh))
# logg.debug('cbor of {} is {} hashes to {}'.format(v, b.hex(), hsh))
def root(self): def root(self):
"""Calculate the root digest from the serialized proof object. """Calculate the root digest from the serialized proof object."""
""" v = self.asdict()
v = self.asdict() # b = cbor2.dumps(v)
#b = cbor2.dumps(v)
b = json.dumps(v) b = json.dumps(v)
f = open(self.temp_proof_path, 'w') f = open(self.temp_proof_path, "w")
f.write(b) f.write(b)
f.close() f.close()
b = b.encode('utf-8') b = b.encode("utf-8")
k = self.hash(b) k = self.hash(b)
return (k.hex(), b) return (k.hex(), b)
def process(self, token_address=None, token_symbol=None, writer=None): def process(self, token_address=None, token_symbol=None, writer=None):
"""Serialize and publish proof. """Serialize and publish proof.
See cic.processor.Processor.process See cic.processor.Processor.process
""" """
if writer == None: if writer == None:
writer = self.writer writer = self.writer
@ -144,38 +145,41 @@ class Proof(Data):
writer.write(k, v) writer.write(k, v)
root_key = k root_key = k
token_symbol_bytes = token_symbol.encode('utf-8') token_symbol_bytes = token_symbol.encode("utf-8")
k = generate_metadata_pointer(token_symbol_bytes, MetadataPointer.TOKEN_PROOF_SYMBOL) k = generate_metadata_pointer(
token_symbol_bytes, MetadataPointer.TOKEN_PROOF_SYMBOL
)
writer.write(k, v) writer.write(k, v)
token_address_bytes = bytes.fromhex(strip_0x(token_address)) token_address_bytes = bytes.fromhex(strip_0x(token_address))
k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_PROOF) k = generate_metadata_pointer(token_address_bytes, MetadataPointer.TOKEN_PROOF)
writer.write(k, v) writer.write(k, v)
# (hsh, hshs) = self.get() # (hsh, hshs) = self.get()
#hshs = list(map(strip_0x, hshs)) # hshs = list(map(strip_0x, hshs))
# hshs_bin = list(map(bytes.fromhex, hshs)) # hshs_bin = list(map(bytes.fromhex, hshs))
# hshs_cat = b''.join(hshs_bin) # hshs_cat = b''.join(hshs_bin)
# f = open(self.temp_proof_path, 'rb') # f = open(self.temp_proof_path, 'rb')
# v = f.read() # v = f.read()
# f.close() # f.close()
# writer.write(hsh, v) # writer.write(hsh, v)
# r = self.hash(hshs_cat) # r = self.hash(hshs_cat)
# r_hex = r.hex() # r_hex = r.hex()
#logg.debug('generated proof {} for hashes {}'.format(r_hex, hshs)) # logg.debug('generated proof {} for hashes {}'.format(r_hex, hshs))
#writer.write(r_hex, hshs_cat) # writer.write(r_hex, hshs_cat)
o = self.asdict() o = self.asdict()
f = open(self.proof_path, 'w') f = open(self.proof_path, "w")
json.dump(o, f, sort_keys=True, indent="\t") json.dump(o, f, sort_keys=True, indent="\t")
f.close() f.close()
return root_key return root_key
def __str__(self): def __str__(self):
return "description = {}\n".format(self.description) return object_to_str(
self, ["description", "issuer", "namespace", "version()", "proofs"]
)

View File

@ -11,6 +11,7 @@ import cic.cmd.init as cmd_init
import cic.cmd.show as cmd_show import cic.cmd.show as cmd_show
import cic.cmd.ext as cmd_ext import cic.cmd.ext as cmd_ext
import cic.cmd.export as cmd_export import cic.cmd.export as cmd_export
import cic.cmd.easy as cmd_easy
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -34,6 +35,8 @@ cmd_export.process_args(sub_export)
sub_ext = sub.add_parser('ext', help='extension helpers') sub_ext = sub.add_parser('ext', help='extension helpers')
cmd_ext.process_args(sub_ext) cmd_ext.process_args(sub_ext)
sub_easy = sub.add_parser('easy', help='Easy Mode Contract Deployment')
cmd_easy.process_args(sub_easy)
args = argparser.parse_args(sys.argv[1:]) args = argparser.parse_args(sys.argv[1:])

19
cic/runnable/test_cmd.py Normal file
View File

@ -0,0 +1,19 @@
# standard imports
import sys
import logging
# local imports
from cic.cmd import CmdCtrl
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
ctrl = CmdCtrl(argv=sys.argv[1:], logger=logg)
def main():
ctrl.execute()
if __name__ == '__main__':
main()

View File

@ -1,12 +1,9 @@
# standard imports # standard imports
import os
import json import json
import os
# local imports # local imports
from .base import ( from .base import Data, data_dir
Data,
data_dir,
)
class Token(Data): class Token(Data):
@ -27,62 +24,91 @@ class Token(Data):
:param code: Bytecode for token chain application :param code: Bytecode for token chain application
:type code: str (hex) :type code: str (hex)
""" """
def __init__(self, path='.', name=None, symbol=None, precision=1, supply=0, code=None):
def __init__(
self,
path=".",
name=None,
symbol=None,
precision=1,
supply=0,
code=None,
extra_args=[],
extra_args_types=[],
):
super(Token, self).__init__() super(Token, self).__init__()
self.name = name self.name = name
self.symbol = symbol self.symbol = symbol
self.supply = supply self.supply = supply
self.precision = precision self.precision = precision
self.code = code self.code = code
self.extra_args = None self.extra_args = extra_args
self.extra_args_types = extra_args_types
self.path = path self.path = path
self.token_path = os.path.join(self.path, 'token.json') self.token_path = os.path.join(self.path, "token.json")
def load(self): def load(self):
"""Load token data from settings. """Load token data from settings."""
"""
super(Token, self).load() super(Token, self).load()
f = open(self.token_path, 'r') f = open(self.token_path, "r")
o = json.load(f) o = json.load(f)
f.close() f.close()
self.name = o['name'] self.name = o["name"]
self.symbol = o['symbol'] self.symbol = o["symbol"]
self.precision = o['precision'] self.precision = o["precision"]
self.code = o['code'] self.code = o["code"]
self.supply = o['supply'] self.supply = o["supply"]
self.extra_args = o['extra'] extras = []
extra_types = []
token_extras: list = o["extra"]
if token_extras:
for token_extra in token_extras:
arg = token_extra.get("arg")
arg_type = token_extra.get("arg_type")
if arg:
extras.append(arg)
if arg_type:
extra_types.append(arg_type)
self.extra_args = extras
self.extra_args_types = extra_types
self.inited = True self.inited = True
def start(self): def start(self):
"""Initialize token settings from arguments passed to the constructor and/or template. """Initialize token settings from arguments passed to the constructor and/or template."""
"""
super(Token, self).load() super(Token, self).load()
token_template_file_path = os.path.join(data_dir, 'token_template_v{}.json'.format(self.version())) token_template_file_path = os.path.join(
data_dir, "token_template_v{}.json".format(self.version())
)
f = open(token_template_file_path) f = open(token_template_file_path)
o = json.load(f) o = json.load(f)
f.close() f.close()
o["name"] = self.name
o["symbol"] = self.symbol
o["precision"] = self.precision
o["code"] = self.code
o["supply"] = self.supply
extra = []
for i in range(len(self.extra_args)):
extra.append(
{"arg": self.extra_args[i], "arg_type": self.extra_args_types[i]}
)
if len(extra):
o["extra"] = extra
print(extra)
o['name'] = self.name f = open(self.token_path, "w")
o['symbol'] = self.symbol
o['precision'] = self.precision
o['code'] = self.code
o['supply'] = self.supply
f = open(self.token_path, 'w')
json.dump(o, f, sort_keys=True, indent="\t") json.dump(o, f, sort_keys=True, indent="\t")
f.close() f.close()
def __str__(self): def __str__(self):
s = """name = {} s = f"name = {self.name}\n"
symbol = {} s += f"symbol = {self.symbol}\n"
precision = {} s += f"precision = {self.precision}\n"
""".format(self.name, self.symbol, self.precision) s += f"supply = {self.supply}\n"
for idx, extra in enumerate(self.extra_args):
s += f"extra_args[{idx}]({self.extra_args_types[idx]}) = {extra}\n"
return s return s

24
cic/utils.py Normal file
View File

@ -0,0 +1,24 @@
def object_to_str(obj, keys):
"""Return a string representation of an object."""
s = ""
for key in keys:
value = eval("obj." + key)
key = key.replace("()", "")
if type(value) == str:
s += f"{key} = {value}\n"
elif type(value) == list:
for idx, vv in enumerate(value):
if not vv:
s += f"{key}[{idx}] = \n"
continue
s += f"{key}[{idx}] = {vv}\n"
elif type(value) == dict:
for vv_key in value.keys():
vv_value = value[vv_key]
if not vv_value:
s += f"{key}.{vv_key} = \n"
continue
s += f"{key}.{vv_key} = {vv_value}\n"
else:
s += f"{key} = {str(value)}\n"
return s

View File

@ -1,4 +1,4 @@
chainlib-eth~=0.0.13 chainlib-eth~=0.0.21
funga-eth~=0.5.1 funga-eth~=0.5.1
eth-token-index~=0.2.4 eth-token-index~=0.2.4
eth-address-index~=0.2.4 eth-address-index~=0.2.4

View File

@ -1,5 +1,6 @@
funga-eth~=0.5.1 funga-eth~=0.5.1
cic-types~=0.2.1a5 cic-types~=0.2.1a8
confini~=0.5.1 confini~=0.5.3
chainlib~=0.0.13 chainlib~=0.0.17
cbor2==5.4.1 cbor2==5.4.1
usumbufu==0.3.6

View File

@ -1,31 +1,55 @@
# standard imports # standard imports
import unittest
import logging import logging
import os import os
import unittest
# external imports
from hexathon import strip_0x
# local imports # local imports
from cic.meta import Meta from cic.meta import Meta
# external imports
from hexathon import strip_0x
# test imports # test imports
from tests.base_cic import ( from tests.base_cic import TestCICBase, test_data_dir
TestCICBase,
test_data_dir,
)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
class TestCICMeta(TestCICBase): class TestCICMeta(TestCICBase):
def test_meta(self): def test_meta(self):
fp = os.path.join(test_data_dir, 'proof') fp = os.path.join(test_data_dir, "proof")
m = Meta(fp) m = Meta(fp)
m.load() m.load()
self.assertEquals(
str(m),
"""name = Test
contact.phone = 0700-123456
country_code = KE
location = Kilifi
""",
)
def test_meta_with_initial_values(self):
fp = os.path.join(test_data_dir, "proof")
m = Meta(
fp,
name="TestName",
location="TestLocation",
country_code="TestCC",
contact={
"phone": "0723578455158",
},
)
self.assertEquals(
str(m),
"""name = TestName
contact.phone = 0723578455158
country_code = TestCC
location = TestLocation
""",
)
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -1,26 +1,39 @@
# standard imports # standard imports
import logging
import os import os
import unittest import unittest
import logging
# local imports # local imports
from cic import Proof from cic import Proof
from cic.attachment import Attachment from cic.attachment import Attachment
# test imports # test imports
from tests.base_cic import ( from tests.base_cic import TestCICBase, root_merged_hash, test_data_dir
test_data_dir,
TestCICBase,
root_merged_hash,
)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger() logg = logging.getLogger()
class TestProof(TestCICBase): class TestProof(TestCICBase):
def test_proof(self):
proof_path = os.path.join(test_data_dir, "proof")
attach = Attachment(proof_path, writer=self.outputs_writer)
attach.load()
c = Proof(path=proof_path, attachments=attach)
c.load()
self.assertEquals(
str(c),
"""description = foo bar baz
issuer = the man
namespace = ge
version = 0
proofs[0] = 2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae
proofs[1] = fcde2b2edba56bf408601fb721fe9b5c338d10ee429ea04fae5511b68fbf8fb9
""",
)
def test_proof_serialize_merge(self): def test_proof_serialize_merge(self):
proof_path = os.path.join(test_data_dir, 'proof') proof_path = os.path.join(test_data_dir, "proof")
attach = Attachment(proof_path, writer=self.outputs_writer) attach = Attachment(proof_path, writer=self.outputs_writer)
attach.load() attach.load()
@ -31,5 +44,5 @@ class TestProof(TestCICBase):
self.assertEqual(v, root_merged_hash) self.assertEqual(v, root_merged_hash)
if __name__ == '__main__': if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -1,7 +1,8 @@
{ {
"name": "", "name": "Test",
"location": "", "location": "Kilifi",
"country_code": "", "country_code": "KE",
"contact": { "contact": {
"phone": "0700-123456"
} }
} }