fix: broken imports
Some checks reported errors
continuous-integration/drone/pr Build encountered an error
continuous-integration/drone/push Build is failing

This commit is contained in:
William Luke 2022-03-01 11:17:17 +03:00
parent a2dfdbedb5
commit 4f219e3d18
13 changed files with 116 additions and 346 deletions

View File

@ -4,17 +4,19 @@ import logging
import os
from typing import Optional
# local imports
from cic import ContractProcessor, Proof
from cic.attachment import Attachment
from cic.meta import Meta, MetadataWriter
from cic.network import Network
from cic.writers import HTTPWriter, KeyedWriterFactory
from cic.token import Token
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
# local imports
from cic.contract.processor import ContractProcessor
from cic.contract.components.proof import Proof
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.components.network import Network
from cic.contract.components.token import Token
from cic.writers import HTTPWriter, KeyedWriterFactory, MetadataWriter
logg = logging.getLogger(__name__)

View File

@ -4,7 +4,7 @@ import importlib
# external imports
from chainlib.chain import ChainSpec
# local imports
from cic.network import Network
from cic.contract.components.network import Network
def process_args(argparser):

View File

@ -3,11 +3,11 @@ import logging
import os
# local imports
from cic import Proof
from cic.meta import Meta
from cic.attachment import Attachment
from cic.network import Network
from cic.token import Token
from cic.contract.components.proof import Proof
from cic.contract.components.meta import Meta
from cic.contract.components.attachment import Attachment
from cic.contract.components.network import Network
from cic.contract.components.token import Token
logg = logging.getLogger(__name__)

View File

@ -1,9 +1,9 @@
# local imports
from cic import Proof
from cic.meta import Meta
from cic.attachment import Attachment
from cic.network import Network
from cic.token import Token
from cic.contract.components.proof import Proof
from cic.contract.components.meta import Meta
from cic.contract.components.attachment import Attachment
from cic.contract.components.network import Network
from cic.contract.components.token import Token
def process_args(argparser):

View File

@ -13,13 +13,12 @@ import requests
from chainlib.chain import ChainSpec
# local imports
from cic import Proof
from cic.actions.deploy import deploy
from cic.actions.types import Contract, Options
from cic.attachment import Attachment
from cic.meta import Meta
from cic.network import Network
from cic.token import Token
from cic.contract.components.proof import Proof
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.components.network import Network
from cic.contract.components.token import Token
from cic.contract.contract import generate_contract, load_contract, deploy_contract
if TYPE_CHECKING:
from chainlib.cli.config import Config
@ -62,237 +61,8 @@ def validate_args(_args):
pass
CONTRACTS = [
{
"url": "https://gitlab.com/cicnet/eth-erc20/-/raw/master/python/giftable_erc20_token/data/GiftableToken",
"name": "Giftable Token",
},
{
"url": "https://gitlab.com/cicnet/erc20-demurrage-token/-/raw/master/python/erc20_demurrage_token/data/DemurrageTokenSingleNocap",
"name": "Demurrage Token Single No Cap",
},
]
# Download File from Url
def download_file(url: str, directory: str, filename=None) -> (str, bytes):
os.makedirs(directory, exist_ok=True)
filename = filename if filename else url.split("/")[-1]
path = os.path.join(directory, filename)
if not os.path.exists(path):
log.debug(f"Downloading {filename}")
r = requests.get(url, allow_redirects=True)
open(path, "wb").write(r.content)
return path
return path
def get_contract_args(data: list):
for item in data:
if item["type"] == "constructor":
return item["inputs"]
raise Exception("No constructor found in contract")
def print_contract_args(json_path: str):
json_data = json.load(open(json_path, encoding="utf-8"))
print("Contract Args:")
for contract_arg in get_contract_args(json_data):
print(
f"\t{contract_arg.get('name', '<no name>')} - {contract_arg.get('type', '<no type>')}"
)
def select_contract():
print("Contracts:")
print("\t C - Custom (path/url to contract)")
for idx, contract in enumerate(CONTRACTS):
print(f"\t {idx} - {contract['name']}")
val = input("Select contract (C,0,1..): ")
if val.isdigit() and int(val) < len(CONTRACTS):
contract = CONTRACTS[int(val)]
directory = f"./contracts/{contract['name']}"
bin_path = os.path.abspath(download_file(contract["url"] + ".bin", directory))
json_path = download_file(contract["url"] + ".json", directory)
elif val == "C":
possible_bin_location = input("Enter path/url to contract: ")
# possible_bin_location is path
if possible_bin_location[0] == "." or possible_bin_location[0] == "/":
if os.path.exists(possible_bin_location):
bin_path = os.path.abspath(possible_bin_location)
else:
raise Exception(f"File {possible_bin_location} does not exist")
possible_json_path = val.replace(".bin", ".json")
if os.path.exists(possible_json_path):
json_path = possible_json_path
# possible_bin_location is url
else:
bin_path = download_file(possible_bin_location, directory)
else:
print("Invalid selection")
exit(1)
contract_extra_args = []
contract_extra_args_types = []
if os.path.exists(json_path):
json_data = json.load(open(json_path, encoding="utf-8"))
for contract_arg in get_contract_args(json_data):
arg_name = contract_arg.get("name")
arg_type = contract_arg.get("type")
if arg_name not in ["_decimals", "_name", "_symbol"]:
val = input(f"Enter value for {arg_name} ({arg_type}): ")
contract_extra_args.append(val)
if arg_type == "uint128":
contract_extra_args_types.append("uint256")
else:
contract_extra_args_types.append(arg_type)
return {
"bin_path": bin_path,
"json_path": json_path,
"extra_args": contract_extra_args,
"extra_args_types": contract_extra_args_types,
}
def init_token(directory: str, code=""):
contract = select_contract()
code = contract["bin_path"]
contract_extra_args = contract["extra_args"]
contract_extra_args_types = contract["extra_args_types"]
name = input("Enter Token Name (Foo Token): ") or "Foo Token"
symbol = input("Enter Token Symbol (FOO): ") or "FOO"
precision = input("Enter Token Precision (6): ") or 6
supply = input("Enter Token Supply (0): ") or 0
contract_token = Token(
directory,
name=name,
symbol=symbol,
precision=precision,
extra_args=contract_extra_args,
extra_args_types=contract_extra_args_types,
supply=supply,
code=code,
)
contract_token.start()
return contract_token
def init_proof(directory):
description = input("Enter Proof Description (None): ") or None
namespace = input("Enter Proof Namespace (ge): ") or "ge"
issuer = input("Enter Proof Issuer (None): ") or None
contract_proof = Proof(directory, description, namespace, issuer)
contract_proof.start()
return contract_proof
def init_meta(directory):
name = input("Enter Name (None): ") or ""
country_code = input("Enter Country Code (KE): ") or "KE"
location = input("Enter Location (None): ") or ""
adding_contact_info = True
contact = {}
while adding_contact_info:
value = input("Enter contact info (e.g 'phone: +254723522718'): ") or None
if value:
data = value.split(":")
if len(data) != 2:
print("Invalid contact info, you must enter in the format 'key: value'")
continue
contact[data[0].strip()] = data[1].strip()
else:
adding_contact_info = False
contract_meta = Meta(
directory,
name=name,
country_code=country_code,
location=location,
contact=contact,
)
contract_meta.start()
return contract_meta
def init_attachment(directory):
contract_attchment = Attachment(directory)
contract_attchment.start()
input(
f"Please add attachment files to '{os.path.abspath(os.path.join(directory,'attachments'))}' and then press ENTER to continue"
)
contract_attchment.load()
return contract_attchment
def load_contract(directory) -> Contract:
token = Token(path=directory)
proof = Proof(path=directory)
meta = Meta(path=directory)
attachment = Attachment(path=directory)
network = Network(directory)
token.load()
proof.load()
meta.load()
attachment.load()
network.load()
return Contract(
token=token, proof=proof, meta=meta, attachment=attachment, network=network
)
def init_network(
directory,
options: Options,
targets: List[str],
):
contract_network = Network(directory, targets=targets)
contract_network.start()
for target in targets:
m = importlib.import_module(f"cic.ext.{target}.start")
m.extension_start(
contract_network,
registry_address=options.contract_registry,
chain_spec=options.chain_spec,
rpc_provider=options.rpc_provider,
key_account_address=options.key_account,
)
contract_network.load()
return contract_network
def generate(directory: str, target: str, options: Options) -> Contract:
if os.path.exists(directory):
contine = input(
"Directory already exists, Would you like to delete it? (y/n): "
)
if contine.lower() != "y":
print("Exiting")
exit(1)
else:
print(f"Deleted {directory}")
os.system(f"rm -rf {directory}")
os.makedirs(directory)
token = init_token(directory)
proof = init_proof(directory)
meta = init_meta(directory)
attachment = init_attachment(directory)
network = init_network(
directory,
options,
targets=[target],
)
return Contract(
token=token, proof=proof, meta=meta, attachment=attachment, network=network
)
def get_options(config: Config, eargs) -> Options:
def get_options(config: Config, eargs):
# Defaults
default_contract_registry = config.get(
"CIC_REGISTRY_ADDRESS"
@ -333,7 +103,7 @@ def get_options(config: Config, eargs) -> Options:
auth_keyfile_path = config.get("AUTH_KEYFILE_PATH")
auth_db_path = config.get("AUTH_DB_PATH")
options = Options(
options = [
auth_db_path,
auth_keyfile_path,
auth_passphrase,
@ -344,14 +114,11 @@ def get_options(config: Config, eargs) -> Options:
metadata_endpoint,
default_wallet_keyfile,
default_wallet_passphrase,
)
]
print(options)
return options
ExtraArgs = {"skip_gen": str, "skip_deploy": str, "target": str, "path": str, "p": str}
@ -362,22 +129,19 @@ def execute(config, eargs: ExtraArgs):
skip_gen = eargs.skip_gen
skip_deploy = eargs.skip_deploy
options = get_options(config, eargs)
if not skip_gen:
contract = generate(directory, target, options)
contract = generate_contract(directory, [target], config, interactive=True)
else:
contract = load_contract(directory)
print_contract(contract)
print(contract)
if not skip_deploy:
ready_to_deploy = input("Ready to deploy? (y/n): ")
if ready_to_deploy == "y":
deploy(
deploy_contract(
config=config,
contract_directory=directory,
options=options,
target=target,
)
print("Deployed")

View File

@ -3,7 +3,7 @@ import os
import hashlib
mod_dir = os.path.dirname(os.path.realpath(__file__))
mod_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
root_dir = os.path.join(mod_dir, '..')
data_dir = os.path.join(mod_dir, 'data')
schema_dir = os.path.join(mod_dir, 'schema')

View File

@ -4,8 +4,6 @@ from __future__ import annotations
import os
import json
import logging
import base64
from typing import TYPE_CHECKING
# external imports
from cic_types import MetadataPointer
@ -14,8 +12,6 @@ from hexathon import strip_0x
# local imports
from cic.contract.base import Data, data_dir
from cic.writers import OutputWriter
from cic_types.ext.metadata import MetadataRequestsHandler
from cic.utils import object_to_str
logg = logging.getLogger(__name__)
@ -139,25 +135,3 @@ class Meta(Data):
def __str__(self):
return object_to_str(self, ["name", "contact", "country_code", "location"])
class MetadataWriter(OutputWriter):
"""Custom writer for publishing data under immutable content-addressed pointers in the cic-meta storage backend.
Data that is not utf-8 will be converted to base64 before publishing.
Implements cic.writers.OutputWriter
"""
def write(self, k, v):
rq = MetadataRequestsHandler(MetadataPointer.NONE, bytes.fromhex(k))
try:
v = v.decode("utf-8")
v = json.loads(v)
logg.debug(f"metadatawriter bindecode {k} {v}")
except UnicodeDecodeError:
v = base64.b64encode(v).decode("utf-8")
v = json.loads(json.dumps(v, separators=(",", ":")))
logg.debug(f"metadatawriter b64encode {k} {v}")
r = rq.create(v)
logg.info(f"metadata submitted at {k}")
return r

View File

@ -7,7 +7,7 @@ import logging
from chainlib.chain import ChainSpec
# local imports
from cic.contract.components.base import Data, data_dir
from cic.contract.base import Data, data_dir
logg = logging.getLogger(__name__)
@ -35,9 +35,8 @@ class Network(Data):
"""
super(Network, self).load()
f = open(self.network_path, 'r')
o = json.load(f)
f.close()
with open(self.network_path, 'r', encoding='utf-8') as f:
o = json.load(f)
self.resources = o['resources']
@ -53,9 +52,8 @@ class Network(Data):
network_template_file_path = os.path.join(data_dir, f'network_template_v{self.version()}.json')
f = open(network_template_file_path)
o_part = json.load(f)
f.close()
with open(network_template_file_path, encoding='utf-8') as f:
o_part = json.load(f)
self.resources = {}
for v in self.targets:
@ -67,11 +65,10 @@ class Network(Data):
def save(self):
"""Save network settings to file.
"""
f = open(self.network_path, 'w')
json.dump({
'resources': self.resources,
}, f, sort_keys=True, indent="\t")
f.close()
with open(self.network_path, 'w', encoding='utf-8') as f:
json.dump({
'resources': self.resources,
}, f, sort_keys=True, indent="\t")
def resource(self, k):
@ -83,8 +80,8 @@ class Network(Data):
:return: Extension settings
"""
v = self.resources.get(k)
if v == None:
raise AttributeError('no defined reference for {}'.format(k))
if v is None:
raise AttributeError(f'No defined reference for {k}')
return v
@ -129,7 +126,7 @@ class Network(Data):
"""
chain_spec_dict = chain_spec.asdict()
for k in chain_spec_dict.keys():
logg.debug('resources {}'.format(self.resources))
logg.debug(f'resources: {self.resources}')
self.resources[resource_key]['chain_spec'][k] = chain_spec_dict[k]

View File

@ -10,16 +10,16 @@ import requests
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
from chainlib.cli.config import Config
from chainlib.chain import ChainSpec
# Local Modules
from cic.contract import ContractProcessor
from cic.contract.processor import ContractProcessor
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.components.network import Network
from cic.contract.components.proof import Proof
from cic.contract.components.token import Token
from cic.contract.helpers import init_writers_from_config
from cic.writers import HTTPWriter, KeyedWriterFactory, OutputWriter
from cic.writers import HTTPWriter, KeyedWriterFactory, OutputWriter, MetadataWriter
log = logging.getLogger(__name__)
@ -78,35 +78,45 @@ def generate_contract(
"Directory already exists, Would you like to delete it? (y/n): "
)
if contine.lower() != "y":
print("Exiting")
exit(1)
print("Trying to load existing contract")
return load_contract(directory)
else:
print(f"Deleted {directory}")
os.system(f"rm -rf {directory}")
os.makedirs(directory)
log.debug("Generating token")
token = Token(directory, interactive=interactive)
token.start()
log.debug("Generating proof")
proof = Proof(directory, interactive=interactive)
proof.start()
log.debug("Generating meta")
meta = Meta(directory, interactive=interactive)
meta.start()
log.debug("Generating attachment")
attachment = Attachment(directory, interactive=interactive)
log.debug("Generating network")
network = Network(directory, targets=targets)
network.start()
log.debug(f"""Populating infomation from network:
CIC_REGISTRY_ADDRESS: {config.get("CIC_REGISTRY_ADDRESS")}
CHAIN_SPEC: {config.get("CHAIN_SPEC")}
RPC_PROVIDER: {config.get("RPC_PROVIDER")}
AUTH_KEY: {config.get("AUTH_KEY")}
""")
for target in targets:
m = importlib.import_module(f"cic.ext.{target}.start")
m.extension_start(
network,
registry_address=config.get("CIC_REGISTRY_ADDRESS"),
chain_spec=config.get("CHAIN_SPEC"),
chain_spec=ChainSpec.from_chain_str(config.get("CHAIN_SPEC")),
rpc_provider=config.get("RPC_PROVIDER"),
key_account_address=config.get("RPC_PROVIDER"),
key_account_address=config.get("AUTH_KEY"), # TODO this should come from the wallet keystore
)
network.load()
@ -115,7 +125,7 @@ def generate_contract(
)
def deploy(
def deploy_contract(
config: Config,
target: str,
contract_directory: str,
@ -133,7 +143,7 @@ def deploy(
if metadata_endpoint is not None:
MetadataRequestsHandler.base_url = metadata_endpoint
MetadataSigner.gpg_path = "/tmp"
MetadataSigner.key_file_path = config.get("AUTH_KEYFILE")
MetadataSigner.key_file_path = config.get("AUTH_KEYFILE_PATH")
MetadataSigner.gpg_passphrase = config.get("AUTH_PASSPHRASE")
writers["proof"] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers["attachment"] = KeyedWriterFactory(None, HTTPWriter).new
@ -169,6 +179,8 @@ def deploy(
chain_spec = cn.chain_spec
config.add(chain_spec, "CHAIN_SPEC", exists_ok=True)
log.debug(f"using CHAIN_SPEC: {str(chain_spec)} from network")
print(chain_spec)
signer_hint = config.get("WALLET_KEY_FILE")
(rpc, signer) = cmd_mod.parse_adapter(config, signer_hint)

View File

@ -4,6 +4,7 @@ import logging
import sys
import json
import requests
import importlib
# local imports
from cic.writers import OutputWriter

View File

@ -1,14 +1,18 @@
# standard imports
import base64
import logging
import os
import sys
import logging
import urllib.request
from typing import TYPE_CHECKING
from cic_types.ext.metadata import MetadataRequestsHandler
logg = logging.getLogger(__name__)
class OutputWriter:
def __init__(self, *args, **kwargs):
pass
@ -17,13 +21,11 @@ class OutputWriter:
class StdoutWriter(OutputWriter):
def write(self, k, v):
sys.stdout.write('{}\t{}\n'.format(k, v))
sys.stdout.write("{}\t{}\n".format(k, v))
class KVWriter(OutputWriter):
def __init__(self, path=None, *args, **kwargs):
try:
os.stat(path)
@ -31,43 +33,38 @@ class KVWriter(OutputWriter):
os.makedirs(path)
self.path = path
def write(self, k, v):
fp = os.path.join(self.path, str(k))
logg.debug('path write {} {}'.format(fp, str(v)))
f = open(fp, 'wb')
logg.debug("path write {} {}".format(fp, str(v)))
f = open(fp, "wb")
f.write(v)
f.close()
class HTTPWriter(OutputWriter):
def __init__(self, path=None, *args, **kwargs):
super(HTTPWriter, self).__init__(*args, **kwargs)
self.path = path
def write(self, k, v):
path = self.path
if k != None:
path = os.path.join(path, k)
logg.debug(f'http writer post {path} \n key: {k}, value: {v}')
rq = urllib.request.Request(path, method='POST', data=v)
logg.debug(f"http writer post {path} \n key: {k}, value: {v}")
rq = urllib.request.Request(path, method="POST", data=v)
r = urllib.request.urlopen(rq)
logg.info('http writer submitted at {}'.format(r.read()))
logg.info("http writer submitted at {}".format(r.read()))
class KeyedWriter(OutputWriter):
def __init__(self, writer_keyed, writer_immutable):
self.writer_keyed = writer_keyed
self.writer_immutable = writer_immutable
def write(self, key, value):
logg.debug(f'writing keywriter key: {key} value: {value}')
logg.debug(f"writing keywriter key: {key} value: {value}")
if isinstance(value, str):
value = value.encode('utf-8')
value = value.encode("utf-8")
if self.writer_keyed != None:
self.writer_keyed.write(key, value)
if self.writer_immutable != None:
@ -75,15 +72,15 @@ class KeyedWriter(OutputWriter):
class KeyedWriterFactory:
def __init__(self, key_writer_constructor, immutable_writer_constructor, *args, **kwargs):
def __init__(
self, key_writer_constructor, immutable_writer_constructor, *args, **kwargs
):
self.key_writer_constructor = key_writer_constructor
self.immutable_writer_constructor = immutable_writer_constructor
self.x = {}
for k in kwargs.keys():
logg.debug('adding key {} t keyed writer factory'.format(k))
self.x[k] = kwargs[k]
logg.debug("adding key {} t keyed writer factory".format(k))
self.x[k] = kwargs[k]
def new(self, path=None, *args, **kwargs):
writer_keyed = None
@ -93,3 +90,26 @@ class KeyedWriterFactory:
if self.immutable_writer_constructor != None:
writer_immutable = self.immutable_writer_constructor(path, **self.x)
return KeyedWriter(writer_keyed, writer_immutable)
class MetadataWriter(OutputWriter):
"""Custom writer for publishing data under immutable content-addressed pointers in the cic-meta storage backend.
Data that is not utf-8 will be converted to base64 before publishing.
Implements cic.writers.OutputWriter
"""
def write(self, k, v):
rq = MetadataRequestsHandler(MetadataPointer.NONE, bytes.fromhex(k))
try:
v = v.decode("utf-8")
v = json.loads(v)
logg.debug(f"metadatawriter bindecode {k} {v}")
except UnicodeDecodeError:
v = base64.b64encode(v).decode("utf-8")
v = json.loads(json.dumps(v, separators=(",", ":")))
logg.debug(f"metadatawriter b64encode {k} {v}")
r = rq.create(v)
logg.info(f"metadata submitted at {k}")
return r

View File

@ -16,13 +16,13 @@ provider = http://localhost:63545
[auth]
type = gnupg
db_path = ~/.local/share/cic/clicada
db_path = /home/will/.local/share/cic/clicada
key = eb3907ecad74a0013c259d5874ae7f22dcbcc95c
keyfile_path = ~/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc
keyfile_path = /home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc
passphrase = merman
[wallet]
key_file = ~/grassroots/cic-internal-integration/apps/contract-migration/keystore/UTC
key_file = /home/will/grassroots/cic-internal-integration/apps/contract-migration/keystore
passphrase =
[chain]

View File

@ -16,9 +16,9 @@ provider = https://rpc.grassecon.net
[auth]
type = gnupg
db_path = ~/.local/share/cic/clicada
db_path = /home/will/.local/share/cic/clicada
key = CCE2E1D2D0E36ADE0405E2D0995BB21816313BD5
keyfile_path = ~/.config/cic/staff-client/user.asc
keyfile_path = /home/will/.config/cic/staff-client/user.asc
passphrase = queenmarlena
[wallet]