# import notifier from clicada.cli.notify import NotifyWriter notifier = NotifyWriter() # notifier.notify('loading script') import importlib import logging # standard imports import os import sys import chainlib.eth.cli import clicada.cli.tag as cmd_tag # local imports import clicada.cli.user as cmd_user # external imports import confini from chainlib.chain import ChainSpec from clicada.cli.auth import PGPAuthCrypt from clicada.cli.http import HTTPSession, PGPClientSession from clicada.crypt.aes import AESCTREncrypt logg = logging.getLogger() script_dir = os.path.dirname(os.path.realpath(__file__)) data_dir = os.path.join(script_dir, "..", "data") base_config_dir = os.path.join(data_dir, "config") class NullWriter: def notify(self, v): pass def ouch(self, v): pass def write(self, v): sys.stdout.write(str(v)) class CmdCtrl: __cmd_alias = { "u": "user", "t": "tag", } __auth_for = [ "user", ] def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): self.args(argv) self.logging(logger) self.module() self.config() self.notifier() self.auth() self.blockchain() self.remote_openers = {} if self.get("META_URL") != None: auth_client_session = PGPClientSession(self.__auth) self.remote_openers["meta"] = HTTPSession( self.get("META_URL"), auth=auth_client_session, origin=self.config.get("META_HTTP_ORIGIN"), ) def blockchain(self): self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC")) self.rpc = chainlib.eth.cli.Rpc() self.__conn = self.rpc.connect_by_config(self.config) def args(self, argv): self.argparser = chainlib.eth.cli.ArgumentParser( chainlib.eth.cli.argflag_std_read ) sub = self.argparser.add_subparsers() sub.dest = "command" sub_user = sub.add_parser( "user", aliases=["u"], help="retrieve transactions for a user" ) cmd_user.process_args(sub_user) sub_tag = sub.add_parser( "tag", aliases=["t"], help="locally assign a display value to an identifier" ) cmd_tag.process_args(sub_tag) self.cmd_args = self.argparser.parse_args(argv) def module(self): self.cmd_string = self.cmd_args.command cmd_string_translate = self.__cmd_alias.get(self.cmd_string) if cmd_string_translate != None: self.cmd_string = cmd_string_translate if self.cmd_string == None: self.cmd_string = "none" modname = "clicada.cli.{}".format(self.cmd_string) self.logger.debug("using module {}".format(modname)) self.cmd_mod = importlib.import_module(modname) def logging(self, logger): self.logger = logger if self.logger == None: self.logger = logging.getLogger() if self.cmd_args.vv: self.logger.setLevel(logging.DEBUG) elif self.cmd_args.v: self.logger.setLevel(logging.INFO) def config(self): override_dir = self.cmd_args.config if override_dir == None: p = os.environ.get("HOME") if p != None: p = os.path.join(p, ".config", "cic", "clicada") try: os.stat(p) override_dir = p logg.info( "applying user config override from standard location: {}".format( p ) ) except FileNotFoundError: pass extra_args = self.cmd_mod.extra_args() self.config = chainlib.eth.cli.Config.from_args( self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, default_config_dir=override_dir, ) self.config.add(False, "_SEQ") self.config.censor("AUTH_PASSPHRASE") self.logger.debug("loaded config:\n{}".format(self.config)) def auth(self): typ = self.get("AUTH_TYPE") if typ != "gnupg": raise NotImplementedError("Valid aut implementations are: gnupg") default_auth_db_path = None if os.environ.get("HOME") != None: default_auth_db_path = os.path.join( os.environ["HOME"], ".local/share/cic/clicada" ) auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path) self.__auth = PGPAuthCrypt( auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH") ) self.__auth.get_secret(self.get("AUTH_PASSPHRASE")) self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret) def get(self, k, default=None): r = self.config.get(k, default) if k in [ "_FORCE", ]: if r == None: return False return self.config.true(k) return r def chain(self): return self.chain_spec def conn(self): return self.__conn def execute(self): self.cmd_mod.execute(self) def opener(self, k): return self.remote_openers[k] def notifier(self): if logg.root.level >= logging.WARNING: logging.disable() self.writer = notifier else: self.writer = NullWriter() def notify(self, v): if logg.root.level <= logging.INFO: print("\033[96m" + v + "\033[0m") def ouch(self, v): print("\033[91m" + v + "\033[0m") def write(self, v): print(v)