cic-cli/cic/cmd/export.py

151 lines
4.3 KiB
Python
Raw Normal View History

# standard imports
import importlib
import logging
import os
from typing import Optional
2021-10-14 15:43:23 +02:00
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
2022-03-01 09:17:17 +01:00
# local imports
from cic.contract.processor import ContractProcessor
from cic.contract.components.proof import Proof
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.components.network import Network
from cic.contract.components.token import Token
from cic.writers import HTTPWriter, KeyedWriterFactory, MetadataWriter
logg = logging.getLogger(__name__)
def process_args(argparser):
argparser.add_argument(
"-d", "--directory", type=str, dest="directory", default=".", help="directory"
)
argparser.add_argument(
"-o",
"--output-directory",
type=str,
dest="output_directory",
help="output directory",
)
argparser.add_argument(
"--metadata-endpoint",
dest="metadata_endpoint",
type=str,
help="metadata endpoint to interact with",
)
argparser.add_argument(
"-y",
"--signer",
type=str,
dest="y",
help="target-specific signer to use for export",
)
argparser.add_argument("-p", type=str, help="RPC endpoint")
argparser.add_argument("target", type=str, help="target network type")
def validate_args(args):
pass
def init_writers_from_config(config):
w = {
"meta": None,
"attachment": None,
"proof": None,
"ext": None,
}
for v in w.keys():
k = "CIC_CORE_{}_WRITER".format(v.upper())
(d, c) = config.get(k).rsplit(".", maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
ExtraArgs = {
"target": str,
"key_file_path": str,
"gpg_passphrase": str,
"directory": str,
"output_directory": str,
"metadata_endpoint": Optional[str],
"y": str,
}
def execute(config, eargs: ExtraArgs):
# !TODO Remove this
eargs.key_file_path = "/home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc"
eargs.gpg_passphrase = "merman"
modname = f"cic.ext.{eargs.target}"
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(config)
2021-10-19 11:18:22 +02:00
output_writer_path_meta = eargs.output_directory
if eargs.metadata_endpoint != None:
MetadataRequestsHandler.base_url = eargs.metadata_endpoint
MetadataSigner.gpg_path = os.path.join("/tmp")
MetadataSigner.key_file_path = eargs.key_file_path
MetadataSigner.gpg_passphrase = eargs.gpg_passphrase
writers["proof"] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers["attachment"] = KeyedWriterFactory(None, HTTPWriter).new
writers["meta"] = MetadataWriter
2021-10-19 11:18:22 +02:00
output_writer_path_meta = eargs.metadata_endpoint
2021-10-10 14:49:22 +02:00
ct = Token(path=eargs.directory)
cm = Meta(
path=eargs.directory, writer=writers["meta"](path=output_writer_path_meta)
)
ca = Attachment(
path=eargs.directory, writer=writers["attachment"](path=output_writer_path_meta)
)
cp = Proof(
path=eargs.directory,
attachments=ca,
writer=writers["proof"](path=output_writer_path_meta),
)
2021-10-10 14:49:22 +02:00
cn = Network(path=eargs.directory)
ca.load()
2021-10-10 14:49:22 +02:00
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = config.get("CHAIN_SPEC")
except KeyError:
chain_spec = cn.chain_spec
config.add(chain_spec, "CHAIN_SPEC", exists_ok=True)
logg.debug(f"CHAIN_SPEC config set to {str(chain_spec)}")
# signer = cmd_mod.parse_signer(eargs.y)
(rpc, signer) = cmd_mod.parse_adapter(config, eargs.y)
ref = cn.resource(eargs.target)
chain_spec = cn.chain_spec(eargs.target)
logg.debug(
f"found reference {ref['contents']} chain spec {chain_spec} for target {eargs.target}"
)
c = getattr(cmd_mod, "new")(
chain_spec,
ref["contents"],
cp,
signer_hint=signer,
rpc=rpc,
outputs_writer=writers["ext"](path=eargs.output_directory),
)
c.apply_token(ct)
p = ContractProcessor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()