cic-cli/cic/cmd/export.py

109 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import logging
import importlib
import os
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
# local imports
from cic import (
Proof,
Processor,
)
from cic.output import (
HTTPWriter,
KeyedWriterFactory,
)
from cic.meta import (
Meta,
MetadataWriter,
)
from cic.attachment import Attachment
from cic.network import Network
from cic.token import Token
from typing import Optional
logg = logging.getLogger(__name__)
def process_args(argparser):
argparser.add_argument('-d', '--directory', type=str, dest='directory', default='.', help='directory')
argparser.add_argument('-o', '--output-directory', type=str, dest='output_directory', help='output directory')
argparser.add_argument('--metadata-endpoint', dest='metadata_endpoint', type=str, help='metadata endpoint to interact with')
argparser.add_argument('-y', '--signer', type=str, dest='y', help='target-specific signer to use for export')
argparser.add_argument('-p', type=str, help='RPC endpoint')
argparser.add_argument('target', type=str, help='target network type')
def validate_args(args):
pass
def init_writers_from_config(config):
w = {
'meta': None,
'attachment': None,
'proof': None,
'ext': None,
}
for v in w.keys():
k = 'CIC_CORE_{}_WRITER'.format(v.upper())
(d, c) = config.get(k).rsplit('.', maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
EArgs = {'target': str, 'directory': str, 'output_directory': str, 'metadata_endpoint': Optional[str], 'y': str}
def execute(config, eargs: EArgs):
modname = 'cic.ext.{}'.format(eargs.target)
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(config)
output_writer_path_meta = eargs.output_directory
if eargs.metadata_endpoint != None:
MetadataRequestsHandler.base_url = eargs.metadata_endpoint
MetadataSigner.gpg_path = os.path.join('/tmp')
MetadataSigner.key_file_path = '/home/will/grassroots/cic-internal-integration/apps/cic-ussd/tests/data/pgp/privatekeys_meta.asc'
MetadataSigner.gpg_passphrase = 'merman'
writers['proof'] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers['attachment'] = KeyedWriterFactory(None, HTTPWriter).new
writers['meta'] = MetadataWriter
output_writer_path_meta = eargs.metadata_endpoint
ct = Token(path=eargs.directory)
cm = Meta(path=eargs.directory, writer=writers['meta'](path=output_writer_path_meta))
ca = Attachment(path=eargs.directory, writer=writers['attachment'](path=output_writer_path_meta))
cp = Proof(path=eargs.directory, attachments=ca, writer=writers['proof'](path=output_writer_path_meta))
cn = Network(path=eargs.directory)
ca.load()
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = config.get('CHAIN_SPEC')
except KeyError:
chain_spec = cn.chain_spec
config.add(chain_spec, 'CHAIN_SPEC', exists_ok=True)
logg.debug('CHAIN_SPEC config set to {}'.format(str(chain_spec)))
#signer = cmd_mod.parse_signer(eargs.y)
(rpc, signer) = cmd_mod.parse_adapter(config, eargs.y)
ref = cn.resource(eargs.target)
chain_spec = cn.chain_spec(eargs.target)
logg.debug('found reference {} chain spec {} for target {}'.format(ref['contents'], chain_spec, eargs.target))
c = getattr(cmd_mod, 'new')(chain_spec, ref['contents'], cp, signer_hint=signer, rpc=rpc, outputs_writer=writers['ext'](path=eargs.output_directory))
c.apply_token(ct)
p = Processor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()