cic-cli/cic/cmd/export.py

151 lines
4.3 KiB
Python

# standard imports
import importlib
import logging
import os
from typing import Optional
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
# local imports
from cic.contract.processor import ContractProcessor
from cic.contract.components.proof import Proof
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.network import Network
from cic.contract.components.token import Token
from cic.writers import HTTPWriter, KeyedWriterFactory, MetadataWriter
logg = logging.getLogger(__name__)
def process_args(argparser):
argparser.add_argument(
"-d", "--directory", type=str, dest="directory", default=".", help="directory"
)
argparser.add_argument(
"-o",
"--output-directory",
type=str,
dest="output_directory",
help="output directory",
)
argparser.add_argument(
"--metadata-endpoint",
dest="metadata_endpoint",
type=str,
help="metadata endpoint to interact with",
)
argparser.add_argument(
"-y",
"--signer",
type=str,
dest="y",
help="target-specific signer to use for export",
)
argparser.add_argument("-p", type=str, help="RPC endpoint")
argparser.add_argument("target", type=str, help="target network type")
def validate_args(args):
pass
def init_writers_from_config(config):
w = {
"meta": None,
"attachment": None,
"proof": None,
"ext": None,
}
for v in w.keys():
k = "CIC_CORE_{}_WRITER".format(v.upper())
(d, c) = config.get(k).rsplit(".", maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
ExtraArgs = {
"target": str,
"key_file_path": str,
"gpg_passphrase": str,
"directory": str,
"output_directory": str,
"metadata_endpoint": Optional[str],
"y": str,
}
def execute(config, eargs: ExtraArgs):
# !TODO Remove this
eargs.key_file_path = "/home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc"
eargs.gpg_passphrase = "merman"
modname = f"cic.ext.{eargs.target}"
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(config)
output_writer_path_meta = eargs.output_directory
if eargs.metadata_endpoint != None:
MetadataRequestsHandler.base_url = eargs.metadata_endpoint
MetadataSigner.gpg_path = os.path.join("/tmp")
MetadataSigner.key_file_path = eargs.key_file_path
MetadataSigner.gpg_passphrase = eargs.gpg_passphrase
writers["proof"] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers["attachment"] = KeyedWriterFactory(None, HTTPWriter).new
writers["meta"] = MetadataWriter
output_writer_path_meta = eargs.metadata_endpoint
ct = Token(path=eargs.directory)
cm = Meta(
path=eargs.directory, writer=writers["meta"](path=output_writer_path_meta)
)
ca = Attachment(
path=eargs.directory, writer=writers["attachment"](path=output_writer_path_meta)
)
cp = Proof(
path=eargs.directory,
attachments=ca,
writer=writers["proof"](path=output_writer_path_meta),
)
cn = Network(path=eargs.directory)
ca.load()
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = config.get("CHAIN_SPEC")
except KeyError:
chain_spec = cn.chain_spec
config.add(chain_spec, "CHAIN_SPEC", exists_ok=True)
logg.debug(f"CHAIN_SPEC config set to {str(chain_spec)}")
# signer = cmd_mod.parse_signer(eargs.y)
(rpc, signer) = cmd_mod.parse_adapter(config, eargs.y)
ref = cn.resource(eargs.target)
chain_spec = cn.chain_spec(eargs.target)
logg.debug(
f"found reference {ref['contents']} chain spec {chain_spec} for target {eargs.target}"
)
c = getattr(cmd_mod, "new")(
chain_spec,
ref["contents"],
cp,
signer_hint=signer,
rpc=rpc,
outputs_writer=writers["ext"](path=eargs.output_directory),
)
c.apply_token(ct)
p = ContractProcessor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()