From 99156a1dba2845a17d43e6b3ad3d3bed5cf8b7ab Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 7 Nov 2021 03:52:52 +0100 Subject: [PATCH] Add phone numbers to listings --- clicada/cli/arg.py | 72 +++++++++++++++++++++++++++++--------------- clicada/cli/auth.py | 36 +++++++++++----------- clicada/cli/http.py | 35 +++++++++++++++++++-- clicada/user/file.py | 19 ++++++++---- 4 files changed, 111 insertions(+), 51 deletions(-) diff --git a/clicada/cli/arg.py b/clicada/cli/arg.py index dde7052..873a34f 100644 --- a/clicada/cli/arg.py +++ b/clicada/cli/arg.py @@ -13,7 +13,10 @@ from chainlib.chain import ChainSpec import clicada.cli.user as cmd_user import clicada.cli.tag as cmd_tag from clicada.cli.auth import PGPAuthCrypt -from clicada.cli.http import HTTPSession +from clicada.cli.http import ( + HTTPSession, + PGPClientSession, + ) script_dir = os.path.dirname(os.path.realpath(__file__)) data_dir = os.path.join(script_dir, '..', 'data') @@ -32,8 +35,32 @@ class CmdCtrl: ] def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): - self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) + self.args(argv) + self.logging(logger) + + self.module() + + self.config() + + self.auth() + + self.blockchain() + + self.remote_openers = {} + if self.get('META_URL') != None: + auth_client_session = PGPClientSession(self.__auth) + self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=auth_client_session) + + + def blockchain(self): + self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC')) + self.rpc = chainlib.eth.cli.Rpc() + self.__conn = self.rpc.connect_by_config(self.config) + + + def args(self, argv): + self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) sub = self.argparser.add_subparsers() sub.dest = 'command' sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user') @@ -43,13 +70,8 @@ class CmdCtrl: self.cmd_args = self.argparser.parse_args(argv) - if logger == None: - logger = logging.getLogger() - if self.cmd_args.vv: - logger.setLevel(logging.DEBUG) - elif self.cmd_args.v: - logger.setLevel(logging.INFO) - + + def module(self): self.cmd_string = self.cmd_args.command cmd_string_translate = self.__cmd_alias.get(self.cmd_string) if cmd_string_translate != None: @@ -59,36 +81,36 @@ class CmdCtrl: raise ValueError('Subcommand missing') modname = 'clicada.cli.{}'.format(self.cmd_string) - logger.debug('using module {}'.format(modname)) + self.logger.debug('using module {}'.format(modname)) self.cmd_mod = importlib.import_module(modname) + + def logging(self, logger): + self.logger = logger + if self.logger == None: + self.logger = logging.getLogger() + if self.cmd_args.vv: + self.logger.setLevel(logging.DEBUG) + elif self.cmd_args.v: + self.logger.setLevel(logging.INFO) + + + def config(self): extra_args = self.cmd_mod.extra_args() - logger.debug('using extra args {}'.format(extra_args)) if self.cmd_args.config: self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, override_dirs=self.cmd_args.c) else: self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args) - self.auth(self.get('AUTH_TYPE')) - self.config.add(False, '_SEQ') self.config.censor('AUTH_PASSPHRASE') - logger.debug('loaded config:\n{}'.format(self.config)) - - self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC')) - - self.rpc = chainlib.eth.cli.Rpc() - self.__conn = self.rpc.connect_by_config(self.config) - - - self.remote_openers = {} - if self.get('META_URL') != None: - self.remote_openers['meta'] = HTTPSession(self.get('META_URL'), auth=self.__auth) + self.logger.debug('loaded config:\n{}'.format(self.config)) - def auth(self, typ): + def auth(self): + typ = self.get('AUTH_TYPE') if typ != 'gnupg': raise NotImplementedError('Valid aut implementations are: gnupg') default_auth_db_path = os.path.join(os.environ['HOME'], '.clicada/auth') diff --git a/clicada/cli/auth.py b/clicada/cli/auth.py index be02d95..5954eb9 100644 --- a/clicada/cli/auth.py +++ b/clicada/cli/auth.py @@ -5,7 +5,6 @@ import logging # external imports import gnupg -from usumbufu.client.hoba import HobaClientSession # local imports from clicada.error import AuthError @@ -13,10 +12,9 @@ from clicada.error import AuthError logg = logging.getLogger(__name__) -class PGPAuthCrypt(HobaClientSession): +class PGPAuthCrypt: typ = 'gnupg' - alg = '969' def __init__(self, db_dir, auth_key, pgp_dir=None): self.db_dir = db_dir @@ -41,9 +39,9 @@ class PGPAuthCrypt(HobaClientSession): h.update(bytes.fromhex(self.auth_key)) h.update(passphrase.encode('utf-8')) z = h.digest() - secret = self.gpg.encrypt(z, [self.auth_key]) + secret = self.gpg.encrypt(z, [self.auth_key], always_trust=True) if not secret.ok: - raise AuthError('could not encrypt secret for {}'.format(auth_key)) + raise AuthError('could not encrypt secret for {}'.format(self.auth_key)) d = os.path.dirname(p) os.makedirs(d, exist_ok=True) @@ -55,21 +53,23 @@ class PGPAuthCrypt(HobaClientSession): if not self.secret.ok: raise AuthError('could not decrypt encryption secret. wrong password?') f.close() + self.__passphrase = passphrase - def sign_auth_challenge(self, plaintext, hoba, encoding): - r = self.gpg.sign(plaintext, passphrase=self.passphrase, detach=True) - + def get_passphrase(self): + return self.__passphrase + + + def fingerprint(self): + return self.auth_key + + + def sign(self, plaintext, encoding, passphrase='', detach=True): + r = self.gpg.sign(plaintext, passphrase=passphrase, detach=detach) + if len(r.data) == 0: + raise AuthError('signing failed: ' + r.status) + if encoding == 'base64': r = r.data - hoba.signature = r - return str(hoba) - - - def __str__(self): - return 'clicada hoba/pgp auth' - - - def __repr__(self): - return 'clicada hoba/pgp auth' + return r diff --git a/clicada/cli/http.py b/clicada/cli/http.py index f5eebde..387aa5a 100644 --- a/clicada/cli/http.py +++ b/clicada/cli/http.py @@ -10,10 +10,37 @@ from usumbufu.client.base import ( BaseTokenStore, ) from usumbufu.client.bearer import BearerClientSession +from usumbufu.client.hoba import HobaClientSession logg = logging.getLogger(__name__) +class PGPClientSession(HobaClientSession): + + alg = '969' + + def __init__(self, auth): + self.auth = auth + self.origin = None + self.fingerprint = self.auth.fingerprint() + + + def sign_auth_challenge(self, plaintext, hoba, encoding): + passphrase = self.auth.get_passphrase() + r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True) + + hoba.signature = r + return str(hoba) + + + def __str__(self): + return 'clicada hoba/pgp auth' + + + def __repr__(self): + return 'clicada hoba/pgp auth' + + class HTTPSession: token_dir = '/run/user/{}/clicada/usumbufu/.token'.format(os.getuid()) @@ -21,21 +48,25 @@ class HTTPSession: def __init__(self, url, auth=None): logg.debug('auth auth {}'.format(auth)) self.base_url = url - url_parts = urllib.parse.urlparse(self.base_url) - self.origin = urllib.parse.urljoin(url_parts[0], url_parts[1]) + url_parts = urllib.parse.urlsplit(self.base_url) + url_parts_origin = (url_parts[0], url_parts[1], '', '', '',) + self.origin = urllib.parse.urlunsplit(url_parts_origin) h = hashlib.sha256() h.update(self.base_url.encode('utf-8')) z = h.digest() token_store_dir = os.path.join(self.token_dir, z.hex()) + os.makedirs(token_store_dir, exist_ok=True) self.token_store = BaseTokenStore(path=token_store_dir) self.session = ClientSession(self.origin, token_store=self.token_store) + bearer_handler = BearerClientSession(self.origin, token_store=self.token_store) self.session.add_subhandler(bearer_handler) if auth != None: + auth.origin = self.origin self.session.add_subhandler(auth) self.opener = urllib.request.build_opener(self.session) diff --git a/clicada/user/file.py b/clicada/user/file.py index 99e4ae0..3eeda13 100644 --- a/clicada/user/file.py +++ b/clicada/user/file.py @@ -9,7 +9,6 @@ import datetime # external imports from hexathon import strip_0x from cic_types.condiments import MetadataPointer -from cic_types.ext.metadata import MetadataRequestsHandler from cic_types.models.person import Person from cic_types.ext.requests import make_request from cic_types.processor import generate_metadata_pointer @@ -53,6 +52,14 @@ class Account(Person): return o + def __str__(self): + return '{} {} ({})'.format( + self.given_name, + self.family_name, + self.tel, + ) + + class FileUserStore: def __init__(self, metadata_opener, chain_spec, label, store_base_path, ttl): @@ -183,14 +190,14 @@ class FileUserStore: except ExpiredRecordError as e: logg.info(e) pass - + + getter = self.metadata_opener + ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE) r = None user_address = None try: - ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE) - url = urllib.parse.urljoin(MetadataRequestsHandler.base_url, ptr) - r = make_request('GET', url) - user_address = r.json() + r = getter.open(ptr) + user_address = json.loads(r) except requests.exceptions.HTTPError as e: logg.debug('no address found for phone {}: {}'.format(phone, e)) return None