From a3a3f552341f1e6ab00e8d7c6a49dae06029cf13 Mon Sep 17 00:00:00 2001 From: William Luke Date: Mon, 2 May 2022 08:23:55 +0300 Subject: [PATCH] fix: disable notify --- clicada/cli/arg.py | 146 ++++++++++++++++++++++---------------------- clicada/cli/user.py | 122 +++++++++++++++++++++++------------- setup.cfg | 2 +- 3 files changed, 155 insertions(+), 115 deletions(-) diff --git a/clicada/cli/arg.py b/clicada/cli/arg.py index 2ea2cb2..68db07a 100644 --- a/clicada/cli/arg.py +++ b/clicada/cli/arg.py @@ -1,46 +1,43 @@ # import notifier from clicada.cli.notify import NotifyWriter + notifier = NotifyWriter() -#notifier.notify('loading script') +# notifier.notify('loading script') + +import importlib +import logging # standard imports import os -import logging -import importlib import sys -# external imports -import confini import chainlib.eth.cli -from chainlib.chain import ChainSpec +import clicada.cli.tag as cmd_tag # local imports import clicada.cli.user as cmd_user -import clicada.cli.tag as cmd_tag + +# external imports +import confini +from chainlib.chain import ChainSpec from clicada.cli.auth import PGPAuthCrypt -from clicada.cli.http import ( - HTTPSession, - PGPClientSession, - ) +from clicada.cli.http import HTTPSession, PGPClientSession from clicada.crypt.aes import AESCTREncrypt logg = logging.getLogger() script_dir = os.path.dirname(os.path.realpath(__file__)) -data_dir = os.path.join(script_dir, '..', 'data') -base_config_dir = os.path.join(data_dir, 'config') +data_dir = os.path.join(script_dir, "..", "data") +base_config_dir = os.path.join(data_dir, "config") class NullWriter: - def notify(self, v): pass - def ouch(self, v): pass - def write(self, v): sys.stdout.write(str(v)) @@ -48,13 +45,13 @@ class NullWriter: class CmdCtrl: __cmd_alias = { - 'u': 'user', - 't': 'tag', - } + "u": "user", + "t": "tag", + } __auth_for = [ - 'user', - ] + "user", + ] def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): self.args(argv) @@ -62,7 +59,7 @@ class CmdCtrl: self.logging(logger) self.module() - + self.config() self.notifier() @@ -72,29 +69,36 @@ class CmdCtrl: self.blockchain() self.remote_openers = {} - if self.get('META_URL') != None: + if self.get("META_URL") != None: auth_client_session = PGPClientSession(self.__auth) - self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=auth_client_session, origin=self.config.get('META_HTTP_ORIGIN')) - + self.remote_openers["meta"] = HTTPSession( + self.get("META_URL"), + auth=auth_client_session, + origin=self.config.get("META_HTTP_ORIGIN"), + ) def blockchain(self): - self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC')) + self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC")) self.rpc = chainlib.eth.cli.Rpc() self.__conn = self.rpc.connect_by_config(self.config) - def args(self, argv): - self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) + self.argparser = chainlib.eth.cli.ArgumentParser( + chainlib.eth.cli.argflag_std_read + ) sub = self.argparser.add_subparsers() - sub.dest = 'command' - sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user') + sub.dest = "command" + sub_user = sub.add_parser( + "user", aliases=["u"], help="retrieve transactions for a user" + ) cmd_user.process_args(sub_user) - sub_tag = sub.add_parser('tag', aliases=['t'], help='locally assign a display value to an identifier') + sub_tag = sub.add_parser( + "tag", aliases=["t"], help="locally assign a display value to an identifier" + ) cmd_tag.process_args(sub_tag) self.cmd_args = self.argparser.parse_args(argv) - def module(self): self.cmd_string = self.cmd_args.command cmd_string_translate = self.__cmd_alias.get(self.cmd_string) @@ -102,12 +106,11 @@ class CmdCtrl: self.cmd_string = cmd_string_translate if self.cmd_string == None: - self.cmd_string = 'none' - - modname = 'clicada.cli.{}'.format(self.cmd_string) - self.logger.debug('using module {}'.format(modname)) - self.cmd_mod = importlib.import_module(modname) + self.cmd_string = "none" + modname = "clicada.cli.{}".format(self.cmd_string) + self.logger.debug("using module {}".format(modname)) + self.cmd_mod = importlib.import_module(modname) def logging(self, logger): self.logger = logger @@ -117,70 +120,75 @@ class CmdCtrl: self.logger.setLevel(logging.DEBUG) elif self.cmd_args.v: self.logger.setLevel(logging.INFO) - def config(self): override_dir = self.cmd_args.config if override_dir == None: - p = os.environ.get('HOME') + p = os.environ.get("HOME") if p != None: - p = os.path.join(p, '.config', 'cic', 'clicada') + p = os.path.join(p, ".config", "cic", "clicada") try: os.stat(p) override_dir = p - logg.info('applying user config override from standard location: {}'.format(p)) + logg.info( + "applying user config override from standard location: {}".format( + p + ) + ) except FileNotFoundError: pass extra_args = self.cmd_mod.extra_args() - self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, default_config_dir=override_dir) + self.config = chainlib.eth.cli.Config.from_args( + self.cmd_args, + base_config_dir=base_config_dir, + extra_args=extra_args, + default_config_dir=override_dir, + ) - self.config.add(False, '_SEQ') + self.config.add(False, "_SEQ") - self.config.censor('AUTH_PASSPHRASE') - - self.logger.debug('loaded config:\n{}'.format(self.config)) + self.config.censor("AUTH_PASSPHRASE") + self.logger.debug("loaded config:\n{}".format(self.config)) def auth(self): - typ = self.get('AUTH_TYPE') - if typ != 'gnupg': - raise NotImplementedError('Valid aut implementations are: gnupg') + typ = self.get("AUTH_TYPE") + if typ != "gnupg": + raise NotImplementedError("Valid aut implementations are: gnupg") default_auth_db_path = None - if os.environ.get('HOME') != None: - default_auth_db_path = os.path.join(os.environ['HOME'], '.local/share/cic/clicada') - auth_db_path = self.get('AUTH_DB_PATH', default_auth_db_path) - self.__auth = PGPAuthCrypt(auth_db_path, self.get('AUTH_KEY'), self.get('AUTH_KEYRING_PATH')) - self.__auth.get_secret(self.get('AUTH_PASSPHRASE')) + if os.environ.get("HOME") != None: + default_auth_db_path = os.path.join( + os.environ["HOME"], ".local/share/cic/clicada" + ) + auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path) + self.__auth = PGPAuthCrypt( + auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH") + ) + self.__auth.get_secret(self.get("AUTH_PASSPHRASE")) self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret) - def get(self, k, default=None): r = self.config.get(k, default) if k in [ - '_FORCE', - ]: + "_FORCE", + ]: if r == None: return False return self.config.true(k) return r - def chain(self): return self.chain_spec - def conn(self): return self.__conn - def execute(self): self.cmd_mod.execute(self) - def opener(self, k): return self.remote_openers[k] - def notifier(self): if logg.root.level >= logging.WARNING: logging.disable() @@ -188,18 +196,12 @@ class CmdCtrl: else: self.writer = NullWriter() - def notify(self, v): - self.writer.notify(v) - + if logg.root.level <= logging.INFO: + print(v) def ouch(self, v): - self.writer.ouch(v) - print() - + print(v) def write(self, v): - self.writer.write("") - self.writer.write(v) - print() - + print(v) diff --git a/clicada/cli/user.py b/clicada/cli/user.py index 7221f26..d109a4c 100644 --- a/clicada/cli/user.py +++ b/clicada/cli/user.py @@ -7,12 +7,14 @@ from pathlib import Path from chainlib.encode import TxHexNormalizer from chainlib.eth.address import is_address, to_checksum_address + # external imports from cic_eth_registry import CICRegistry from cic_eth_registry.lookup.tokenindex import TokenIndexLookup from cic_types.models.person import Person from clicada.error import MetadataNotFoundError from clicada.token import FileTokenStore, token_balance + # local imports from clicada.tx import ResolvedTokenTx, TxGetter from clicada.user import FileUserStore @@ -24,24 +26,36 @@ tx_normalizer = TxHexNormalizer() def process_args(argparser): - argparser.add_argument('-m', '--method', type=str, help='lookup method') - argparser.add_argument('--meta-url', dest='meta_url', type=str, help='Url to retrieve metadata from') - argparser.add_argument('-f', '--force-update', dest='force_update', action='store_true', help='Update records of mutable entries') - argparser.add_argument('identifier', type=str, help='user identifier (phone_number or address)') + argparser.add_argument("-m", "--method", type=str, help="lookup method") + argparser.add_argument( + "--meta-url", dest="meta_url", type=str, help="Url to retrieve metadata from" + ) + argparser.add_argument( + "-f", + "--force-update", + dest="force_update", + action="store_true", + help="Update records of mutable entries", + ) + argparser.add_argument( + "identifier", type=str, help="user identifier (phone_number or address)" + ) def extra_args(): return { - 'force_update': '_FORCE', - 'method': 'META_LOOKUP_METHOD', - 'meta_url': 'META_URL', - 'identifier': '_IDENTIFIER', - } + "force_update": "_FORCE", + "method": "META_LOOKUP_METHOD", + "meta_url": "META_URL", + "identifier": "_IDENTIFIER", + } def apply_args(config, args): - if config.get('META_LOOKUP_METHOD'): - raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented') + if config.get("META_LOOKUP_METHOD"): + raise NotImplementedError( + 'Sorry, currently only "phone" lookup method is implemented' + ) def validate(config, args): @@ -49,20 +63,27 @@ def validate(config, args): def execute(ctrl): - tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 10) + tx_getter = TxGetter(ctrl.get("TX_CACHE_URL"), 10) - store_path = os.path.join(str(Path.home()), '.clicada') - user_phone_file_label = 'phone' - user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) + store_path = os.path.join(str(Path.home()), ".clicada") + user_phone_file_label = "phone" + user_phone_store = FileUserStore( + ctrl.opener("meta"), + ctrl.chain(), + user_phone_file_label, + store_path, + int(ctrl.get("FILESTORE_TTL")), + encrypter=ctrl.encrypter, + ) - identifier = ctrl.get('_IDENTIFIER') - ctrl.notify('resolving identifier {} to wallet address'.format(identifier)) + identifier = ctrl.get("_IDENTIFIER") + ctrl.notify("resolving identifier {} to wallet address".format(identifier)) if is_address(identifier): user_address = identifier else: - user_address = user_phone_store.by_phone(identifier, update=ctrl.get('_FORCE')) + user_address = user_phone_store.by_phone(identifier, update=ctrl.get("_FORCE")) if user_address == None: - ctrl.ouch('unknown identifier: {}\n'.format(identifier)) + ctrl.ouch("unknown identifier: {}\n".format(identifier)) sys.exit(1) try: user_address = to_checksum_address(user_address) @@ -70,52 +91,69 @@ def execute(ctrl): ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, identifier)) sys.exit(1) - logg.debug('loaded user address {} for {}'.format(user_address, identifier)) + logg.debug("loaded user address {} for {}".format(user_address, identifier)) user_address_normal = tx_normalizer.wallet_address(user_address) - ctrl.notify('retrieving txs for address {}'.format(user_address_normal)) + ctrl.notify("retrieving txs for address {}".format(user_address_normal)) txs = tx_getter.get(user_address) - token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path) + token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), "token", store_path) - user_address_file_label = 'address' - user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) + user_address_file_label = "address" + user_address_store = FileUserStore( + ctrl.opener("meta"), + ctrl.chain(), + user_address_file_label, + store_path, + int(ctrl.get("FILESTORE_TTL")), + encrypter=ctrl.encrypter, + ) - ctrl.notify('resolving metadata for address {}'.format(user_address_normal)) + ctrl.notify("resolving metadata for address {}".format(user_address_normal)) r = None try: - r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE')) + r = user_address_store.by_address( + user_address_normal, update=ctrl.get("_FORCE") + ) except MetadataNotFoundError as e: - ctrl.ouch('could not resolve metadata for user: {}'.format(e)) + ctrl.ouch("could not resolve metadata for user: {}".format(e)) - ctrl.write(f"""Phone: {ctrl.get('_IDENTIFIER')} + ctrl.write( + f"""Phone: {ctrl.get("_IDENTIFIER")} Network address: {add_0x(user_address)} Chain: {ctrl.chain().common_name()} -Name: {str(r)} -Registered: {r and datetime.datetime.fromtimestamp(r).ctime()} -Gender: {r and r.gender} -Location: {r and r.location['area_name']} -Products: {r and ','.join(r.products)} -Tags: {r and ','.join(r.tags)}""" -) +Name: { str(r)} +Registered: {datetime.datetime.fromtimestamp(r.date_registered).ctime()} +Gender: {r.gender} +Location: {r.location["area_name"]} +Products: {",".join(r.products)} +Tags: {",".join(r.tags)}""" + ) tx_lines = [] seen_tokens = {} - for tx_src in txs['data']: - ctrl.notify('resolve details for tx {}'.format(tx_src['tx_hash'])) + for tx_src in txs["data"]: + ctrl.notify("resolve details for tx {}".format(tx_src["tx_hash"])) tx = ResolvedTokenTx.from_dict(tx_src) - tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE')) + tx.resolve( + token_store, + user_address_store, + show_decimals=True, + update=ctrl.get("_FORCE"), + ) tx_lines.append(tx) seen_tokens[tx.source_token_label] = tx.source_token seen_tokens[tx.destination_token_label] = tx.destination_token for k in seen_tokens.keys(): - ctrl.notify('resolve token {}'.format(seen_tokens[k])) + ctrl.notify("resolve token {}".format(seen_tokens[k])) (token_symbol, token_decimals) = token_store.by_address(seen_tokens[k]) - ctrl.notify('get token balance for {} => {}'.format(token_symbol, seen_tokens[k])) + ctrl.notify( + "get token balance for {} => {}".format(token_symbol, seen_tokens[k]) + ) balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address) - fmt = '{:.' + str(token_decimals) + 'f}' - decimal_balance = fmt.format(balance / (10 ** token_decimals)) + fmt = "{:." + str(token_decimals) + "f}" + decimal_balance = fmt.format(balance / (10**token_decimals)) ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance)) print() diff --git a/setup.cfg b/setup.cfg index 9fc189c..c509dea 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = clicada -version = 0.1.1 +version = 0.1.2 description = CLI CRM tool for the cic-stack custodial wallet system author = Louis Holbrook author_email = dev@holbrook.no