Compare commits

..

No commits in common. "master" and "lum/store_path" have entirely different histories.

7 changed files with 154 additions and 191 deletions

2
.gitignore vendored
View File

@ -4,5 +4,3 @@ build/
*.pyc *.pyc
.venv .venv
.clicada .clicada
dist/
.vscode/

View File

@ -1,5 +1,3 @@
- 0.0.7
* fix: make store_path relative to the users home
- 0.0.6 - 0.0.6
* Add cache encryption, with AES-CTR-128 * Add cache encryption, with AES-CTR-128
- 0.0.5 - 0.0.5

View File

@ -1,43 +1,46 @@
# import notifier # import notifier
from clicada.cli.notify import NotifyWriter from clicada.cli.notify import NotifyWriter
notifier = NotifyWriter() notifier = NotifyWriter()
# notifier.notify('loading script') #notifier.notify('loading script')
import importlib
import logging
# standard imports # standard imports
import os import os
import logging
import importlib
import sys import sys
import chainlib.eth.cli
import clicada.cli.tag as cmd_tag
# local imports
import clicada.cli.user as cmd_user
# external imports # external imports
import confini import confini
import chainlib.eth.cli
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
# local imports
import clicada.cli.user as cmd_user
import clicada.cli.tag as cmd_tag
from clicada.cli.auth import PGPAuthCrypt from clicada.cli.auth import PGPAuthCrypt
from clicada.cli.http import HTTPSession, PGPClientSession from clicada.cli.http import (
HTTPSession,
PGPClientSession,
)
from clicada.crypt.aes import AESCTREncrypt from clicada.crypt.aes import AESCTREncrypt
logg = logging.getLogger() logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, "..", "data") data_dir = os.path.join(script_dir, '..', 'data')
base_config_dir = os.path.join(data_dir, "config") base_config_dir = os.path.join(data_dir, 'config')
class NullWriter: class NullWriter:
def notify(self, v): def notify(self, v):
pass pass
def ouch(self, v): def ouch(self, v):
pass pass
def write(self, v): def write(self, v):
sys.stdout.write(str(v)) sys.stdout.write(str(v))
@ -45,13 +48,13 @@ class NullWriter:
class CmdCtrl: class CmdCtrl:
__cmd_alias = { __cmd_alias = {
"u": "user", 'u': 'user',
"t": "tag", 't': 'tag',
} }
__auth_for = [ __auth_for = [
"user", 'user',
] ]
def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): def __init__(self, argv=None, description=None, logger=None, *args, **kwargs):
self.args(argv) self.args(argv)
@ -69,36 +72,29 @@ class CmdCtrl:
self.blockchain() self.blockchain()
self.remote_openers = {} self.remote_openers = {}
if self.get("META_URL") != None: if self.get('META_URL') != None:
auth_client_session = PGPClientSession(self.__auth) auth_client_session = PGPClientSession(self.__auth)
self.remote_openers["meta"] = HTTPSession( self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=auth_client_session, origin=self.config.get('META_HTTP_ORIGIN'))
self.get("META_URL"),
auth=auth_client_session,
origin=self.config.get("META_HTTP_ORIGIN"),
)
def blockchain(self): def blockchain(self):
self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC")) self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC'))
self.rpc = chainlib.eth.cli.Rpc() self.rpc = chainlib.eth.cli.Rpc()
self.__conn = self.rpc.connect_by_config(self.config) self.__conn = self.rpc.connect_by_config(self.config)
def args(self, argv): def args(self, argv):
self.argparser = chainlib.eth.cli.ArgumentParser( self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read)
chainlib.eth.cli.argflag_std_read
)
sub = self.argparser.add_subparsers() sub = self.argparser.add_subparsers()
sub.dest = "command" sub.dest = 'command'
sub_user = sub.add_parser( sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user')
"user", aliases=["u"], help="retrieve transactions for a user"
)
cmd_user.process_args(sub_user) cmd_user.process_args(sub_user)
sub_tag = sub.add_parser( sub_tag = sub.add_parser('tag', aliases=['t'], help='locally assign a display value to an identifier')
"tag", aliases=["t"], help="locally assign a display value to an identifier"
)
cmd_tag.process_args(sub_tag) cmd_tag.process_args(sub_tag)
self.cmd_args = self.argparser.parse_args(argv) self.cmd_args = self.argparser.parse_args(argv)
def module(self): def module(self):
self.cmd_string = self.cmd_args.command self.cmd_string = self.cmd_args.command
cmd_string_translate = self.__cmd_alias.get(self.cmd_string) cmd_string_translate = self.__cmd_alias.get(self.cmd_string)
@ -106,12 +102,13 @@ class CmdCtrl:
self.cmd_string = cmd_string_translate self.cmd_string = cmd_string_translate
if self.cmd_string == None: if self.cmd_string == None:
self.cmd_string = "none" self.cmd_string = 'none'
modname = "clicada.cli.{}".format(self.cmd_string) modname = 'clicada.cli.{}'.format(self.cmd_string)
self.logger.debug("using module {}".format(modname)) self.logger.debug('using module {}'.format(modname))
self.cmd_mod = importlib.import_module(modname) self.cmd_mod = importlib.import_module(modname)
def logging(self, logger): def logging(self, logger):
self.logger = logger self.logger = logger
if self.logger == None: if self.logger == None:
@ -121,74 +118,69 @@ class CmdCtrl:
elif self.cmd_args.v: elif self.cmd_args.v:
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
def config(self): def config(self):
override_dir = self.cmd_args.config override_dir = self.cmd_args.config
if override_dir == None: if override_dir == None:
p = os.environ.get("HOME") p = os.environ.get('HOME')
if p != None: if p != None:
p = os.path.join(p, ".config", "cic", "clicada") p = os.path.join(p, '.config', 'cic', 'clicada')
try: try:
os.stat(p) os.stat(p)
override_dir = p override_dir = p
logg.info( logg.info('applying user config override from standard location: {}'.format(p))
"applying user config override from standard location: {}".format(
p
)
)
except FileNotFoundError: except FileNotFoundError:
pass pass
extra_args = self.cmd_mod.extra_args() extra_args = self.cmd_mod.extra_args()
self.config = chainlib.eth.cli.Config.from_args( self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, default_config_dir=override_dir)
self.cmd_args,
base_config_dir=base_config_dir,
extra_args=extra_args,
default_config_dir=override_dir,
)
self.config.add(False, "_SEQ") self.config.add(False, '_SEQ')
self.config.censor("AUTH_PASSPHRASE") self.config.censor('AUTH_PASSPHRASE')
self.logger.debug('loaded config:\n{}'.format(self.config))
self.logger.debug("loaded config:\n{}".format(self.config))
def auth(self): def auth(self):
typ = self.get("AUTH_TYPE") typ = self.get('AUTH_TYPE')
if typ != "gnupg": if typ != 'gnupg':
raise NotImplementedError("Valid aut implementations are: gnupg") raise NotImplementedError('Valid aut implementations are: gnupg')
default_auth_db_path = None default_auth_db_path = None
if os.environ.get("HOME") != None: if os.environ.get('HOME') != None:
default_auth_db_path = os.path.join( default_auth_db_path = os.path.join(os.environ['HOME'], '.local/share/cic/clicada')
os.environ["HOME"], ".local/share/cic/clicada" auth_db_path = self.get('AUTH_DB_PATH', default_auth_db_path)
) self.__auth = PGPAuthCrypt(auth_db_path, self.get('AUTH_KEY'), self.get('AUTH_KEYRING_PATH'))
auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path) self.__auth.get_secret(self.get('AUTH_PASSPHRASE'))
self.__auth = PGPAuthCrypt(
auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH")
)
self.__auth.get_secret(self.get("AUTH_PASSPHRASE"))
self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret) self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret)
def get(self, k, default=None): def get(self, k, default=None):
r = self.config.get(k, default) r = self.config.get(k, default)
if k in [ if k in [
"_FORCE", '_FORCE',
]: ]:
if r == None: if r == None:
return False return False
return self.config.true(k) return self.config.true(k)
return r return r
def chain(self): def chain(self):
return self.chain_spec return self.chain_spec
def conn(self): def conn(self):
return self.__conn return self.__conn
def execute(self): def execute(self):
self.cmd_mod.execute(self) self.cmd_mod.execute(self)
def opener(self, k): def opener(self, k):
return self.remote_openers[k] return self.remote_openers[k]
def notifier(self): def notifier(self):
if logg.root.level >= logging.WARNING: if logg.root.level >= logging.WARNING:
logging.disable() logging.disable()
@ -196,12 +188,18 @@ class CmdCtrl:
else: else:
self.writer = NullWriter() self.writer = NullWriter()
def notify(self, v): def notify(self, v):
if logg.root.level <= logging.INFO: self.writer.notify(v)
print("\033[96m" + v + "\033[0m")
def ouch(self, v): def ouch(self, v):
print("\033[91m" + v + "\033[0m") self.writer.ouch(v)
print()
def write(self, v): def write(self, v):
print(v) self.writer.write("")
self.writer.write(v)
print()

View File

@ -1,24 +1,27 @@
# standard imports # standard imports
import datetime
import logging
import os
import sys import sys
import logging
import datetime
from pathlib import Path from pathlib import Path
import os
from chainlib.encode import TxHexNormalizer
from chainlib.eth.address import is_address, to_checksum_address
# external imports # external imports
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.models.person import Person from cic_types.models.person import Person
from clicada.error import MetadataNotFoundError from chainlib.eth.address import to_checksum_address
from clicada.token import FileTokenStore, token_balance from chainlib.encode import TxHexNormalizer
from hexathon import add_0x
# local imports # local imports
from clicada.tx import ResolvedTokenTx, TxGetter from clicada.tx import TxGetter
from clicada.user import FileUserStore from clicada.user import FileUserStore
from hexathon import add_0x from clicada.token import (
FileTokenStore,
token_balance,
)
from clicada.tx import ResolvedTokenTx
from clicada.error import MetadataNotFoundError
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
@ -26,36 +29,24 @@ tx_normalizer = TxHexNormalizer()
def process_args(argparser): def process_args(argparser):
argparser.add_argument("-m", "--method", type=str, help="lookup method") argparser.add_argument('-m', '--method', type=str, help='lookup method')
argparser.add_argument( argparser.add_argument('--meta-url', dest='meta_url', type=str, help='Url to retrieve metadata from')
"--meta-url", dest="meta_url", type=str, help="Url to retrieve metadata from" argparser.add_argument('-f', '--force-update', dest='force_update', action='store_true', help='Update records of mutable entries')
) argparser.add_argument('identifier', type=str, help='user identifier')
argparser.add_argument(
"-f",
"--force-update",
dest="force_update",
action="store_true",
help="Update records of mutable entries",
)
argparser.add_argument(
"identifier", type=str, help="user identifier (phone_number or address)"
)
def extra_args(): def extra_args():
return { return {
"force_update": "_FORCE", 'force_update': '_FORCE',
"method": "META_LOOKUP_METHOD", 'method': 'META_LOOKUP_METHOD',
"meta_url": "META_URL", 'meta_url': 'META_URL',
"identifier": "_IDENTIFIER", 'identifier': '_IDENTIFIER',
} }
def apply_args(config, args): def apply_args(config, args):
if config.get("META_LOOKUP_METHOD"): if config.get('META_LOOKUP_METHOD'):
raise NotImplementedError( raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented')
'Sorry, currently only "phone" lookup method is implemented'
)
def validate(config, args): def validate(config, args):
@ -63,101 +54,79 @@ def validate(config, args):
def execute(ctrl): def execute(ctrl):
tx_getter = TxGetter(ctrl.get("TX_CACHE_URL"), 10) tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 10)
store_path = os.path.join(str(Path.home()), ".clicada") store_path = os.path.join(str(Path.home()), '.clicada')
user_phone_file_label = "phone" user_phone_file_label = 'phone'
user_phone_store = FileUserStore( user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter)
ctrl.opener("meta"),
ctrl.chain(),
user_phone_file_label,
store_path,
int(ctrl.get("FILESTORE_TTL")),
encrypter=ctrl.encrypter,
)
identifier = ctrl.get("_IDENTIFIER") ctrl.notify('resolving identifier {} to wallet address'.format(ctrl.get('_IDENTIFIER')))
ctrl.notify("resolving identifier {} to wallet address".format(identifier)) user_address = user_phone_store.by_phone(ctrl.get('_IDENTIFIER'), update=ctrl.get('_FORCE'))
if is_address(identifier):
user_address = identifier
else:
user_address = user_phone_store.by_phone(identifier, update=ctrl.get("_FORCE"))
if user_address == None: if user_address == None:
ctrl.ouch("unknown identifier: {}\n".format(identifier)) ctrl.ouch('unknown identifier: {}\n'.format(ctrl.get('_IDENTIFIER')))
sys.exit(1) sys.exit(1)
try: try:
user_address = to_checksum_address(user_address) user_address = to_checksum_address(user_address)
except ValueError: except ValueError:
ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, identifier)) ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, ctrl.get('_IDENTIFIER')))
sys.exit(1) sys.exit(1)
logg.debug("loaded user address {} for {}".format(user_address, identifier)) logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_IDENTIFIER')))
user_address_normal = tx_normalizer.wallet_address(user_address) user_address_normal = tx_normalizer.wallet_address(user_address)
ctrl.notify("retrieving txs for address {}".format(user_address_normal)) ctrl.notify('retrieving txs for address {}'.format(user_address_normal))
txs = tx_getter.get(user_address) txs = tx_getter.get(user_address)
token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), "token", store_path) token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path)
user_address_file_label = "address" user_address_file_label = 'address'
user_address_store = FileUserStore( user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter)
ctrl.opener("meta"),
ctrl.chain(),
user_address_file_label,
store_path,
int(ctrl.get("FILESTORE_TTL")),
encrypter=ctrl.encrypter,
)
r = None ctrl.notify('resolving metadata for address {}'.format(user_address_normal))
ctrl.write(
f"""
Phone: {ctrl.get("_IDENTIFIER")}
Network address: {add_0x(user_address)}
Chain: {ctrl.chain().common_name()}"""
)
ctrl.notify("resolving metadata for address {}".format(user_address_normal))
try: try:
r = user_address_store.by_address( r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE'))
user_address_normal, update=ctrl.get("_FORCE")
)
if r:
ctrl.write(
f"""
Name: { str(r)}
Registered: {datetime.datetime.fromtimestamp(r.date_registered).ctime()}
Gender: {r.gender}
Location: {r.location["area_name"]}
Products: {",".join(r.products)}
Tags: {",".join(r.tags)}"""
)
except MetadataNotFoundError as e: except MetadataNotFoundError as e:
ctrl.ouch(f"MetadataNotFoundError: Could not resolve metadata for user {e}\n") ctrl.ouch('could not resolve metadata for user: {}'.format(e))
sys.exit(1)
ctrl.write("""Phone: {}
Network address: {}
Chain: {}
Name: {}
Registered: {}
Gender: {}
Location: {}
Products: {}
Tags: {}""".format(
ctrl.get('_IDENTIFIER'),
add_0x(user_address),
ctrl.chain().common_name(),
str(r),
datetime.datetime.fromtimestamp(r.date_registered).ctime(),
r.gender,
r.location['area_name'],
','.join(r.products),
','.join(r.tags),
)
)
tx_lines = [] tx_lines = []
seen_tokens = {} seen_tokens = {}
for tx_src in txs["data"]: for tx_src in txs['data']:
ctrl.notify("resolve details for tx {}".format(tx_src["tx_hash"])) ctrl.notify('resolve details for tx {}'.format(tx_src['tx_hash']))
tx = ResolvedTokenTx.from_dict(tx_src) tx = ResolvedTokenTx.from_dict(tx_src)
tx.resolve( tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE'))
token_store,
user_address_store,
show_decimals=True,
update=ctrl.get("_FORCE"),
)
tx_lines.append(tx) tx_lines.append(tx)
seen_tokens[tx.source_token_label] = tx.source_token seen_tokens[tx.source_token_label] = tx.source_token
seen_tokens[tx.destination_token_label] = tx.destination_token seen_tokens[tx.destination_token_label] = tx.destination_token
for k in seen_tokens.keys(): for k in seen_tokens.keys():
ctrl.notify("resolve token {}".format(seen_tokens[k])) ctrl.notify('resolve token {}'.format(seen_tokens[k]))
(token_symbol, token_decimals) = token_store.by_address(seen_tokens[k]) (token_symbol, token_decimals) = token_store.by_address(seen_tokens[k])
ctrl.notify( ctrl.notify('get token balance for {} => {}'.format(token_symbol, seen_tokens[k]))
"get token balance for {} => {}".format(token_symbol, seen_tokens[k])
)
balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address) balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address)
fmt = "{:." + str(token_decimals) + "f}" fmt = '{:.' + str(token_decimals) + 'f}'
decimal_balance = fmt.format(balance / (10**token_decimals)) decimal_balance = fmt.format(balance / (10 ** token_decimals))
ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance)) ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance))
print() print()

View File

@ -12,7 +12,7 @@ from cic_types.condiments import MetadataPointer
from cic_types.models.person import Person from cic_types.models.person import Person
from cic_types.ext.requests import make_request from cic_types.ext.requests import make_request
from cic_types.processor import generate_metadata_pointer from cic_types.processor import generate_metadata_pointer
from requests.exceptions import HTTPError import requests.exceptions
import phonenumbers import phonenumbers
# local imports # local imports
@ -222,7 +222,7 @@ class FileUserStore:
try: try:
r = getter.open(ptr) r = getter.open(ptr)
user_address = json.loads(r) user_address = json.loads(r)
except HTTPError as e: except requests.exceptions.HTTPError as e:
logg.debug('no address found for phone {}: {}'.format(phone, e)) logg.debug('no address found for phone {}: {}'.format(phone, e))
return None return None
@ -269,7 +269,7 @@ class FileUserStore:
except Exception as e: except Exception as e:
logg.debug('no metadata found for {}: {}'.format(address, e)) logg.debug('no metadata found for {}: {}'.format(address, e))
if not r: if r == None:
self.failed_entities[address] = True self.failed_entities[address] = True
raise MetadataNotFoundError() raise MetadataNotFoundError()

View File

@ -1,10 +1,10 @@
usumbufu~=0.3.8 usumbufu~=0.3.5
confini~=0.6.0 confini~=0.5.3
cic-eth-registry~=0.6.9 cic-eth-registry~=0.6.1
cic-types~=0.2.2 cic-types~=0.2.1a8
phonenumbers==8.12.12 phonenumbers==8.12.12
eth-erc20~=0.3.0 eth-erc20~=0.1.2
hexathon~=0.1.0 hexathon~=0.1.0
pycryptodome~=3.10.1 pycryptodome~=3.10.1
chainlib-eth~=0.1.0 chainlib-eth~=0.0.21
chainlib~=0.1.0 chainlib~=0.0.17

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = clicada name = clicada
version = 0.1.3 version = 0.0.6
description = CLI CRM tool for the cic-stack custodial wallet system description = CLI CRM tool for the cic-stack custodial wallet system
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no