cic-cli/cic/cmd/export.py

146 lines
4.1 KiB
Python
Raw Permalink Normal View History

# standard imports
import importlib
import logging
import os
from typing import Optional
2021-10-14 15:43:23 +02:00
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
2022-03-01 09:17:17 +01:00
# local imports
from cic.contract.processor import ContractProcessor
from cic.contract.components.proof import Proof
2022-03-01 09:17:17 +01:00
from cic.contract.components.attachment import Attachment
from cic.contract.components.meta import Meta
from cic.contract.network import Network
2022-03-01 09:17:17 +01:00
from cic.contract.components.token import Token
from cic.writers import HTTPWriter, KeyedWriterFactory, MetadataWriter
logg = logging.getLogger(__name__)
def process_args(argparser):
argparser.add_argument(
"-d", "--directory", type=str, dest="directory", default=".", help="directory"
)
argparser.add_argument(
"-o",
"--output-directory",
type=str,
dest="output_directory",
help="output directory",
)
argparser.add_argument(
"--metadata-endpoint",
dest="metadata_endpoint",
type=str,
help="metadata endpoint to interact with",
)
argparser.add_argument(
"-y",
"--signer",
type=str,
dest="y",
help="target-specific signer to use for export",
)
argparser.add_argument("-p", type=str, help="RPC endpoint")
argparser.add_argument("target", type=str, help="target network type")
def validate_args(args):
pass
def init_writers_from_config(config):
w = {
"meta": None,
"attachment": None,
"proof": None,
"ext": None,
}
for v in w.keys():
k = "CIC_CORE_{}_WRITER".format(v.upper())
(d, c) = config.get(k).rsplit(".", maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
ExtraArgs = {
"target": str,
"directory": str,
"output_directory": str,
"metadata_endpoint": Optional[str],
"y": str,
}
def execute(config, eargs: ExtraArgs):
modname = f"cic.ext.{eargs.target}"
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(config)
2021-10-19 11:18:22 +02:00
output_writer_path_meta = eargs.output_directory
if eargs.metadata_endpoint != None:
MetadataRequestsHandler.base_url = eargs.metadata_endpoint
MetadataSigner.gpg_path = os.path.join("/tmp")
2022-03-03 08:49:35 +01:00
MetadataSigner.key_file_path = config.get("AUTH_KEYFILE_PATH")
MetadataSigner.gpg_passphrase = config.get("AUTH_PASSPHRASE")
writers["proof"] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers["attachment"] = KeyedWriterFactory(None, HTTPWriter).new
writers["meta"] = MetadataWriter
2021-10-19 11:18:22 +02:00
output_writer_path_meta = eargs.metadata_endpoint
2021-10-10 14:49:22 +02:00
ct = Token(path=eargs.directory)
cm = Meta(
path=eargs.directory, writer=writers["meta"](path=output_writer_path_meta)
)
ca = Attachment(
path=eargs.directory, writer=writers["attachment"](path=output_writer_path_meta)
)
cp = Proof(
path=eargs.directory,
attachments=ca,
writer=writers["proof"](path=output_writer_path_meta),
)
2021-10-10 14:49:22 +02:00
cn = Network(path=eargs.directory)
ca.load()
2021-10-10 14:49:22 +02:00
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = config.get("CHAIN_SPEC")
except KeyError:
chain_spec = cn.chain_spec
config.add(chain_spec, "CHAIN_SPEC", exists_ok=True)
logg.debug(f"CHAIN_SPEC config set to {str(chain_spec)}")
# signer = cmd_mod.parse_signer(eargs.y)
(rpc, signer) = cmd_mod.parse_adapter(config, eargs.y)
ref = cn.resource(eargs.target)
chain_spec = cn.chain_spec(eargs.target)
logg.debug(
f"found reference {ref['contents']} chain spec {chain_spec} for target {eargs.target}"
)
c = getattr(cmd_mod, "new")(
chain_spec,
ref["contents"],
cp,
signer_hint=signer,
rpc=rpc,
outputs_writer=writers["ext"](path=eargs.output_directory),
)
c.apply_token(ct)
p = ContractProcessor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()