cic-cli/cic/actions/deploy.py

97 lines
3.1 KiB
Python
Raw Normal View History

2022-02-10 09:22:21 +01:00
# standard imports
import logging
import importlib
import os
# external imports
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.ext.metadata.signer import Signer as MetadataSigner
# local imports
from cic import (
Proof,
Processor,
)
from cic.output import (
HTTPWriter,
KeyedWriterFactory,
)
from cic.meta import (
Meta,
MetadataWriter,
)
from cic.attachment import Attachment
from cic.network import Network
from cic.token import Token
from typing import Optional
logg = logging.getLogger(__name__)
def init_writers_from_config(config):
w = {
'meta': None,
'attachment': None,
'proof': None,
'ext': None,
}
for v in w.keys():
k = 'CIC_CORE_{}_WRITER'.format(v.upper())
(d, c) = config.get(k).rsplit('.', maxsplit=1)
m = importlib.import_module(d)
o = getattr(m, c)
w[v] = o
return w
def deploy(config, target: str,contract_directory: str, metadata_endpoint: Optional[str], keystore_directory: str, key_file_path: str, gpg_passphrase: str):
modname = 'cic.ext.{}'.format(target)
cmd_mod = importlib.import_module(modname)
writers = init_writers_from_config(config)
output_directory = os.path.join(contract_directory, 'out')
output_writer_path_meta = output_directory
if metadata_endpoint != None:
MetadataRequestsHandler.base_url = metadata_endpoint
MetadataSigner.gpg_path = os.path.join('/tmp')
MetadataSigner.key_file_path = key_file_path
MetadataSigner.gpg_passphrase = gpg_passphrase
writers['proof'] = KeyedWriterFactory(MetadataWriter, HTTPWriter).new
writers['attachment'] = KeyedWriterFactory(None, HTTPWriter).new
writers['meta'] = MetadataWriter
output_writer_path_meta = metadata_endpoint
ct = Token(path=contract_directory)
cm = Meta(path=contract_directory, writer=writers['meta'](path=output_writer_path_meta))
ca = Attachment(path=contract_directory, writer=writers['attachment'](path=output_writer_path_meta))
cp = Proof(path=contract_directory, attachments=ca, writer=writers['proof'](path=output_writer_path_meta))
cn = Network(path=contract_directory)
ca.load()
ct.load()
cp.load()
cm.load()
cn.load()
chain_spec = None
try:
chain_spec = config.get('CHAIN_SPEC')
except KeyError:
chain_spec = cn.chain_spec
config.add(chain_spec, 'CHAIN_SPEC', exists_ok=True)
logg.debug(f'CHAIN_SPEC config set to {str(chain_spec)}')
(rpc, signer) = cmd_mod.parse_adapter(config, keystore_directory)
ref = cn.resource(target)
chain_spec = cn.chain_spec(target)
logg.debug('found reference {} chain spec {} for target {}'.format(ref['contents'], chain_spec, target))
c = getattr(cmd_mod, 'new')(chain_spec, ref['contents'], cp, signer_hint=signer, rpc=rpc, outputs_writer=writers['ext'](path=output_directory))
c.apply_token(ct)
p = Processor(proof=cp, attachment=ca, metadata=cm, extensions=[c])
p.process()