# standard imports import os #import argparse import logging import importlib # external imports import confini import chainlib.eth.cli from chainlib.chain import ChainSpec # local imports import clicada.cli.user as cmd_user import clicada.cli.tag as cmd_tag from clicada.cli.auth import PGPAuthCrypt from clicada.cli.http import ( HTTPSession, PGPClientSession, ) script_dir = os.path.dirname(os.path.realpath(__file__)) data_dir = os.path.join(script_dir, '..', 'data') base_config_dir = os.path.join(data_dir, 'config') class CmdCtrl: __cmd_alias = { 'u': 'user', 't': 'tag', } __auth_for = [ 'user', ] def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): self.args(argv) self.logging(logger) self.module() self.config() self.auth() self.blockchain() self.remote_openers = {} if self.get('META_URL') != None: auth_client_session = PGPClientSession(self.__auth) self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=auth_client_session) def blockchain(self): self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC')) self.rpc = chainlib.eth.cli.Rpc() self.__conn = self.rpc.connect_by_config(self.config) def args(self, argv): self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) sub = self.argparser.add_subparsers() sub.dest = 'command' sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user') cmd_user.process_args(sub_user) sub_tag = sub.add_parser('tag', aliases=['t'], help='locally assign a display value to an identifier') cmd_tag.process_args(sub_tag) self.cmd_args = self.argparser.parse_args(argv) def module(self): self.cmd_string = self.cmd_args.command cmd_string_translate = self.__cmd_alias.get(self.cmd_string) if cmd_string_translate != None: self.cmd_string = cmd_string_translate if self.cmd_string == None: self.cmd_string = 'none' modname = 'clicada.cli.{}'.format(self.cmd_string) self.logger.debug('using module {}'.format(modname)) self.cmd_mod = importlib.import_module(modname) def logging(self, logger): self.logger = logger if self.logger == None: self.logger = logging.getLogger() if self.cmd_args.vv: self.logger.setLevel(logging.DEBUG) elif self.cmd_args.v: self.logger.setLevel(logging.INFO) def config(self): extra_args = self.cmd_mod.extra_args() if self.cmd_args.config: self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, default_config_dir=self.cmd_args.config) else: self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args) self.config.add(False, '_SEQ') self.config.censor('AUTH_PASSPHRASE') self.logger.debug('loaded config:\n{}'.format(self.config)) def auth(self): typ = self.get('AUTH_TYPE') if typ != 'gnupg': raise NotImplementedError('Valid aut implementations are: gnupg') default_auth_db_path = os.path.join(os.environ['HOME'], '.clicada/auth') auth_db_path = self.get('AUTH_DB_PATH', default_auth_db_path) self.__auth = PGPAuthCrypt(auth_db_path, self.get('AUTH_KEY'), self.get('AUTH_KEYRING_PATH')) self.__auth.get_secret(self.get('AUTH_PASSPHRASE')) def get(self, k, default=None): r = self.config.get(k, default) if k in [ '_FORCE', ]: if r == None: return False return self.config.true(k) return r def chain(self): return self.chain_spec def conn(self): return self.__conn def execute(self): self.cmd_mod.execute(self) def opener(self, k): return self.remote_openers[k]