Compare commits

...

7 Commits

6 changed files with 189 additions and 155 deletions

3
.gitignore vendored
View File

@ -4,4 +4,5 @@ build/
*.pyc *.pyc
.venv .venv
.clicada .clicada
dist/ dist/
.vscode/

View File

@ -1,46 +1,43 @@
# import notifier # import notifier
from clicada.cli.notify import NotifyWriter from clicada.cli.notify import NotifyWriter
notifier = NotifyWriter() notifier = NotifyWriter()
#notifier.notify('loading script') # notifier.notify('loading script')
import importlib
import logging
# standard imports # standard imports
import os import os
import logging
import importlib
import sys import sys
# external imports
import confini
import chainlib.eth.cli import chainlib.eth.cli
from chainlib.chain import ChainSpec import clicada.cli.tag as cmd_tag
# local imports # local imports
import clicada.cli.user as cmd_user import clicada.cli.user as cmd_user
import clicada.cli.tag as cmd_tag
# external imports
import confini
from chainlib.chain import ChainSpec
from clicada.cli.auth import PGPAuthCrypt from clicada.cli.auth import PGPAuthCrypt
from clicada.cli.http import ( from clicada.cli.http import HTTPSession, PGPClientSession
HTTPSession,
PGPClientSession,
)
from clicada.crypt.aes import AESCTREncrypt from clicada.crypt.aes import AESCTREncrypt
logg = logging.getLogger() logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data') data_dir = os.path.join(script_dir, "..", "data")
base_config_dir = os.path.join(data_dir, 'config') base_config_dir = os.path.join(data_dir, "config")
class NullWriter: class NullWriter:
def notify(self, v): def notify(self, v):
pass pass
def ouch(self, v): def ouch(self, v):
pass pass
def write(self, v): def write(self, v):
sys.stdout.write(str(v)) sys.stdout.write(str(v))
@ -48,13 +45,13 @@ class NullWriter:
class CmdCtrl: class CmdCtrl:
__cmd_alias = { __cmd_alias = {
'u': 'user', "u": "user",
't': 'tag', "t": "tag",
} }
__auth_for = [ __auth_for = [
'user', "user",
] ]
def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): def __init__(self, argv=None, description=None, logger=None, *args, **kwargs):
self.args(argv) self.args(argv)
@ -62,7 +59,7 @@ class CmdCtrl:
self.logging(logger) self.logging(logger)
self.module() self.module()
self.config() self.config()
self.notifier() self.notifier()
@ -72,29 +69,36 @@ class CmdCtrl:
self.blockchain() self.blockchain()
self.remote_openers = {} self.remote_openers = {}
if self.get('META_URL') != None: if self.get("META_URL") != None:
auth_client_session = PGPClientSession(self.__auth) auth_client_session = PGPClientSession(self.__auth)
self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=auth_client_session, origin=self.config.get('META_HTTP_ORIGIN')) self.remote_openers["meta"] = HTTPSession(
self.get("META_URL"),
auth=auth_client_session,
origin=self.config.get("META_HTTP_ORIGIN"),
)
def blockchain(self): def blockchain(self):
self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC')) self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC"))
self.rpc = chainlib.eth.cli.Rpc() self.rpc = chainlib.eth.cli.Rpc()
self.__conn = self.rpc.connect_by_config(self.config) self.__conn = self.rpc.connect_by_config(self.config)
def args(self, argv): def args(self, argv):
self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) self.argparser = chainlib.eth.cli.ArgumentParser(
chainlib.eth.cli.argflag_std_read
)
sub = self.argparser.add_subparsers() sub = self.argparser.add_subparsers()
sub.dest = 'command' sub.dest = "command"
sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user') sub_user = sub.add_parser(
"user", aliases=["u"], help="retrieve transactions for a user"
)
cmd_user.process_args(sub_user) cmd_user.process_args(sub_user)
sub_tag = sub.add_parser('tag', aliases=['t'], help='locally assign a display value to an identifier') sub_tag = sub.add_parser(
"tag", aliases=["t"], help="locally assign a display value to an identifier"
)
cmd_tag.process_args(sub_tag) cmd_tag.process_args(sub_tag)
self.cmd_args = self.argparser.parse_args(argv) self.cmd_args = self.argparser.parse_args(argv)
def module(self): def module(self):
self.cmd_string = self.cmd_args.command self.cmd_string = self.cmd_args.command
cmd_string_translate = self.__cmd_alias.get(self.cmd_string) cmd_string_translate = self.__cmd_alias.get(self.cmd_string)
@ -102,12 +106,11 @@ class CmdCtrl:
self.cmd_string = cmd_string_translate self.cmd_string = cmd_string_translate
if self.cmd_string == None: if self.cmd_string == None:
self.cmd_string = 'none' self.cmd_string = "none"
modname = 'clicada.cli.{}'.format(self.cmd_string)
self.logger.debug('using module {}'.format(modname))
self.cmd_mod = importlib.import_module(modname)
modname = "clicada.cli.{}".format(self.cmd_string)
self.logger.debug("using module {}".format(modname))
self.cmd_mod = importlib.import_module(modname)
def logging(self, logger): def logging(self, logger):
self.logger = logger self.logger = logger
@ -117,70 +120,75 @@ class CmdCtrl:
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG)
elif self.cmd_args.v: elif self.cmd_args.v:
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
def config(self): def config(self):
override_dir = self.cmd_args.config override_dir = self.cmd_args.config
if override_dir == None: if override_dir == None:
p = os.environ.get('HOME') p = os.environ.get("HOME")
if p != None: if p != None:
p = os.path.join(p, '.config', 'cic', 'clicada') p = os.path.join(p, ".config", "cic", "clicada")
try: try:
os.stat(p) os.stat(p)
override_dir = p override_dir = p
logg.info('applying user config override from standard location: {}'.format(p)) logg.info(
"applying user config override from standard location: {}".format(
p
)
)
except FileNotFoundError: except FileNotFoundError:
pass pass
extra_args = self.cmd_mod.extra_args() extra_args = self.cmd_mod.extra_args()
self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, default_config_dir=override_dir) self.config = chainlib.eth.cli.Config.from_args(
self.cmd_args,
base_config_dir=base_config_dir,
extra_args=extra_args,
default_config_dir=override_dir,
)
self.config.add(False, '_SEQ') self.config.add(False, "_SEQ")
self.config.censor('AUTH_PASSPHRASE') self.config.censor("AUTH_PASSPHRASE")
self.logger.debug('loaded config:\n{}'.format(self.config))
self.logger.debug("loaded config:\n{}".format(self.config))
def auth(self): def auth(self):
typ = self.get('AUTH_TYPE') typ = self.get("AUTH_TYPE")
if typ != 'gnupg': if typ != "gnupg":
raise NotImplementedError('Valid aut implementations are: gnupg') raise NotImplementedError("Valid aut implementations are: gnupg")
default_auth_db_path = None default_auth_db_path = None
if os.environ.get('HOME') != None: if os.environ.get("HOME") != None:
default_auth_db_path = os.path.join(os.environ['HOME'], '.local/share/cic/clicada') default_auth_db_path = os.path.join(
auth_db_path = self.get('AUTH_DB_PATH', default_auth_db_path) os.environ["HOME"], ".local/share/cic/clicada"
self.__auth = PGPAuthCrypt(auth_db_path, self.get('AUTH_KEY'), self.get('AUTH_KEYRING_PATH')) )
self.__auth.get_secret(self.get('AUTH_PASSPHRASE')) auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path)
self.__auth = PGPAuthCrypt(
auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH")
)
self.__auth.get_secret(self.get("AUTH_PASSPHRASE"))
self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret) self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret)
def get(self, k, default=None): def get(self, k, default=None):
r = self.config.get(k, default) r = self.config.get(k, default)
if k in [ if k in [
'_FORCE', "_FORCE",
]: ]:
if r == None: if r == None:
return False return False
return self.config.true(k) return self.config.true(k)
return r return r
def chain(self): def chain(self):
return self.chain_spec return self.chain_spec
def conn(self): def conn(self):
return self.__conn return self.__conn
def execute(self): def execute(self):
self.cmd_mod.execute(self) self.cmd_mod.execute(self)
def opener(self, k): def opener(self, k):
return self.remote_openers[k] return self.remote_openers[k]
def notifier(self): def notifier(self):
if logg.root.level >= logging.WARNING: if logg.root.level >= logging.WARNING:
logging.disable() logging.disable()
@ -188,18 +196,12 @@ class CmdCtrl:
else: else:
self.writer = NullWriter() self.writer = NullWriter()
def notify(self, v): def notify(self, v):
self.writer.notify(v) if logg.root.level <= logging.INFO:
print("\033[96m" + v + "\033[0m")
def ouch(self, v): def ouch(self, v):
self.writer.ouch(v) print("\033[91m" + v + "\033[0m")
print()
def write(self, v): def write(self, v):
self.writer.write("") print(v)
self.writer.write(v)
print()

View File

@ -1,27 +1,24 @@
# standard imports # standard imports
import sys
import logging
import datetime import datetime
from pathlib import Path import logging
import os import os
import sys
from pathlib import Path
from chainlib.encode import TxHexNormalizer
from chainlib.eth.address import is_address, to_checksum_address
# external imports # external imports
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.models.person import Person from cic_types.models.person import Person
from chainlib.eth.address import to_checksum_address from clicada.error import MetadataNotFoundError
from chainlib.encode import TxHexNormalizer from clicada.token import FileTokenStore, token_balance
from hexathon import add_0x
# local imports # local imports
from clicada.tx import TxGetter from clicada.tx import ResolvedTokenTx, TxGetter
from clicada.user import FileUserStore from clicada.user import FileUserStore
from clicada.token import ( from hexathon import add_0x
FileTokenStore,
token_balance,
)
from clicada.tx import ResolvedTokenTx
from clicada.error import MetadataNotFoundError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -29,24 +26,36 @@ tx_normalizer = TxHexNormalizer()
def process_args(argparser): def process_args(argparser):
argparser.add_argument('-m', '--method', type=str, help='lookup method') argparser.add_argument("-m", "--method", type=str, help="lookup method")
argparser.add_argument('--meta-url', dest='meta_url', type=str, help='Url to retrieve metadata from') argparser.add_argument(
argparser.add_argument('-f', '--force-update', dest='force_update', action='store_true', help='Update records of mutable entries') "--meta-url", dest="meta_url", type=str, help="Url to retrieve metadata from"
argparser.add_argument('identifier', type=str, help='user identifier') )
argparser.add_argument(
"-f",
"--force-update",
dest="force_update",
action="store_true",
help="Update records of mutable entries",
)
argparser.add_argument(
"identifier", type=str, help="user identifier (phone_number or address)"
)
def extra_args(): def extra_args():
return { return {
'force_update': '_FORCE', "force_update": "_FORCE",
'method': 'META_LOOKUP_METHOD', "method": "META_LOOKUP_METHOD",
'meta_url': 'META_URL', "meta_url": "META_URL",
'identifier': '_IDENTIFIER', "identifier": "_IDENTIFIER",
} }
def apply_args(config, args): def apply_args(config, args):
if config.get('META_LOOKUP_METHOD'): if config.get("META_LOOKUP_METHOD"):
raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented') raise NotImplementedError(
'Sorry, currently only "phone" lookup method is implemented'
)
def validate(config, args): def validate(config, args):
@ -54,79 +63,101 @@ def validate(config, args):
def execute(ctrl): def execute(ctrl):
tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 10) tx_getter = TxGetter(ctrl.get("TX_CACHE_URL"), 10)
store_path = os.path.join(str(Path.home()), '.clicada') store_path = os.path.join(str(Path.home()), ".clicada")
user_phone_file_label = 'phone' user_phone_file_label = "phone"
user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) user_phone_store = FileUserStore(
ctrl.opener("meta"),
ctrl.chain(),
user_phone_file_label,
store_path,
int(ctrl.get("FILESTORE_TTL")),
encrypter=ctrl.encrypter,
)
ctrl.notify('resolving identifier {} to wallet address'.format(ctrl.get('_IDENTIFIER'))) identifier = ctrl.get("_IDENTIFIER")
user_address = user_phone_store.by_phone(ctrl.get('_IDENTIFIER'), update=ctrl.get('_FORCE')) ctrl.notify("resolving identifier {} to wallet address".format(identifier))
if is_address(identifier):
user_address = identifier
else:
user_address = user_phone_store.by_phone(identifier, update=ctrl.get("_FORCE"))
if user_address == None: if user_address == None:
ctrl.ouch('unknown identifier: {}\n'.format(ctrl.get('_IDENTIFIER'))) ctrl.ouch("unknown identifier: {}\n".format(identifier))
sys.exit(1) sys.exit(1)
try: try:
user_address = to_checksum_address(user_address) user_address = to_checksum_address(user_address)
except ValueError: except ValueError:
ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, ctrl.get('_IDENTIFIER'))) ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, identifier))
sys.exit(1) sys.exit(1)
logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_IDENTIFIER'))) logg.debug("loaded user address {} for {}".format(user_address, identifier))
user_address_normal = tx_normalizer.wallet_address(user_address) user_address_normal = tx_normalizer.wallet_address(user_address)
ctrl.notify('retrieving txs for address {}'.format(user_address_normal)) ctrl.notify("retrieving txs for address {}".format(user_address_normal))
txs = tx_getter.get(user_address) txs = tx_getter.get(user_address)
token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path) token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), "token", store_path)
user_address_file_label = 'address' user_address_file_label = "address"
user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) user_address_store = FileUserStore(
ctrl.opener("meta"),
ctrl.notify('resolving metadata for address {}'.format(user_address_normal)) ctrl.chain(),
try: user_address_file_label,
r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE')) store_path,
except MetadataNotFoundError as e: int(ctrl.get("FILESTORE_TTL")),
ctrl.ouch('could not resolve metadata for user: {}'.format(e)) encrypter=ctrl.encrypter,
sys.exit(1)
ctrl.write("""Phone: {}
Network address: {}
Chain: {}
Name: {}
Registered: {}
Gender: {}
Location: {}
Products: {}
Tags: {}""".format(
ctrl.get('_IDENTIFIER'),
add_0x(user_address),
ctrl.chain().common_name(),
str(r),
datetime.datetime.fromtimestamp(r.date_registered).ctime(),
r.gender,
r.location['area_name'],
','.join(r.products),
','.join(r.tags),
) )
)
r = None
ctrl.write(
f"""
Phone: {ctrl.get("_IDENTIFIER")}
Network address: {add_0x(user_address)}
Chain: {ctrl.chain().common_name()}"""
)
ctrl.notify("resolving metadata for address {}".format(user_address_normal))
try:
r = user_address_store.by_address(
user_address_normal, update=ctrl.get("_FORCE")
)
if r:
ctrl.write(
f"""
Name: { str(r)}
Registered: {datetime.datetime.fromtimestamp(r.date_registered).ctime()}
Gender: {r.gender}
Location: {r.location["area_name"]}
Products: {",".join(r.products)}
Tags: {",".join(r.tags)}"""
)
except MetadataNotFoundError as e:
ctrl.ouch(f"MetadataNotFoundError: Could not resolve metadata for user {e}\n")
tx_lines = [] tx_lines = []
seen_tokens = {} seen_tokens = {}
for tx_src in txs['data']: for tx_src in txs["data"]:
ctrl.notify('resolve details for tx {}'.format(tx_src['tx_hash'])) ctrl.notify("resolve details for tx {}".format(tx_src["tx_hash"]))
tx = ResolvedTokenTx.from_dict(tx_src) tx = ResolvedTokenTx.from_dict(tx_src)
tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE')) tx.resolve(
token_store,
user_address_store,
show_decimals=True,
update=ctrl.get("_FORCE"),
)
tx_lines.append(tx) tx_lines.append(tx)
seen_tokens[tx.source_token_label] = tx.source_token seen_tokens[tx.source_token_label] = tx.source_token
seen_tokens[tx.destination_token_label] = tx.destination_token seen_tokens[tx.destination_token_label] = tx.destination_token
for k in seen_tokens.keys(): for k in seen_tokens.keys():
ctrl.notify('resolve token {}'.format(seen_tokens[k])) ctrl.notify("resolve token {}".format(seen_tokens[k]))
(token_symbol, token_decimals) = token_store.by_address(seen_tokens[k]) (token_symbol, token_decimals) = token_store.by_address(seen_tokens[k])
ctrl.notify('get token balance for {} => {}'.format(token_symbol, seen_tokens[k])) ctrl.notify(
"get token balance for {} => {}".format(token_symbol, seen_tokens[k])
)
balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address) balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address)
fmt = '{:.' + str(token_decimals) + 'f}' fmt = "{:." + str(token_decimals) + "f}"
decimal_balance = fmt.format(balance / (10 ** token_decimals)) decimal_balance = fmt.format(balance / (10**token_decimals))
ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance)) ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance))
print() print()

View File

@ -12,7 +12,7 @@ from cic_types.condiments import MetadataPointer
from cic_types.models.person import Person from cic_types.models.person import Person
from cic_types.ext.requests import make_request from cic_types.ext.requests import make_request
from cic_types.processor import generate_metadata_pointer from cic_types.processor import generate_metadata_pointer
import requests.exceptions from requests.exceptions import HTTPError
import phonenumbers import phonenumbers
# local imports # local imports
@ -222,7 +222,7 @@ class FileUserStore:
try: try:
r = getter.open(ptr) r = getter.open(ptr)
user_address = json.loads(r) user_address = json.loads(r)
except requests.exceptions.HTTPError as e: except HTTPError as e:
logg.debug('no address found for phone {}: {}'.format(phone, e)) logg.debug('no address found for phone {}: {}'.format(phone, e))
return None return None
@ -269,7 +269,7 @@ class FileUserStore:
except Exception as e: except Exception as e:
logg.debug('no metadata found for {}: {}'.format(address, e)) logg.debug('no metadata found for {}: {}'.format(address, e))
if r == None: if not r:
self.failed_entities[address] = True self.failed_entities[address] = True
raise MetadataNotFoundError() raise MetadataNotFoundError()

View File

@ -1,10 +1,10 @@
usumbufu~=0.3.5 usumbufu~=0.3.8
confini~=0.5.3 confini~=0.6.0
cic-eth-registry~=0.6.1 cic-eth-registry~=0.6.9
cic-types~=0.2.1a8 cic-types~=0.2.2
phonenumbers==8.12.12 phonenumbers==8.12.12
eth-erc20~=0.1.2 eth-erc20~=0.3.0
hexathon~=0.1.0 hexathon~=0.1.0
pycryptodome~=3.10.1 pycryptodome~=3.10.1
chainlib-eth~=0.0.21 chainlib-eth~=0.1.0
chainlib~=0.0.17 chainlib~=0.1.0

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = clicada name = clicada
version = 0.0.7 version = 0.1.3
description = CLI CRM tool for the cic-stack custodial wallet system description = CLI CRM tool for the cic-stack custodial wallet system
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no