Add pgp auth, subject metadata summary in cli user output

This commit is contained in:
nolash 2021-11-06 10:34:40 +01:00
parent 76b331178c
commit bdfdd0fdd7
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
7 changed files with 118 additions and 18 deletions

View File

@ -12,6 +12,7 @@ from chainlib.chain import ChainSpec
# local imports # local imports
import clicada.cli.user as cmd_user import clicada.cli.user as cmd_user
import clicada.cli.tag as cmd_tag import clicada.cli.tag as cmd_tag
from clicada.cli.auth import PGPAuthCrypt
script_dir = os.path.dirname(os.path.realpath(__file__)) script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data') data_dir = os.path.join(script_dir, '..', 'data')
@ -25,13 +26,12 @@ class CmdCtrl:
't': 'tag', 't': 'tag',
} }
def __init__(self, argv=None, description=None, logger=None, *args, **kwargs): __auth_for = [
#self.argparser = argparse.ArgumentParser(description=description, *args, **kwargs) 'user',
self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read) ]
#self.argparser.add_argument('-c', type=str, help='Configuration override directory path') def __init__(self, argv=None, description=None, logger=None, *args, **kwargs):
#self.argparser.add_argument('-v', action='store_true', help='Be verbose') self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read)
#self.argparser.add_argument('-vv', action='store_true', help='Be very verbose')
sub = self.argparser.add_subparsers() sub = self.argparser.add_subparsers()
sub.dest = 'command' sub.dest = 'command'
@ -68,6 +68,8 @@ class CmdCtrl:
else: else:
self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args) self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args)
self.__auth = self.auth(self.get('AUTH_TYPE'))
self.config.add(False, '_SEQ') self.config.add(False, '_SEQ')
logger.debug('loaded config:\n{}'.format(self.config)) logger.debug('loaded config:\n{}'.format(self.config))
@ -77,8 +79,18 @@ class CmdCtrl:
self.__conn = self.rpc.connect_by_config(self.config) self.__conn = self.rpc.connect_by_config(self.config)
def get(self, k):
r = self.config.get(k) def auth(self, typ):
if typ != 'gnupg':
raise NotImplementedError('Valid aut implementations are: gnupg')
default_auth_db_path = os.path.join(os.environ['HOME'], '.clicada/auth')
auth_db_path = self.get('AUTH_DB_PATH', default_auth_db_path)
self.__auth = PGPAuthCrypt(auth_db_path, self.get('AUTH_KEY'), self.get('AUTH_KEYRING_PATH'))
self.__auth.get_secret(self.get('AUTH_PASSPHRASE'))
def get(self, k, default=None):
r = self.config.get(k, default)
if k in [ if k in [
'_FORCE', '_FORCE',
]: ]:

53
clicada/cli/auth.py Normal file
View File

@ -0,0 +1,53 @@
# standard imports
import os
import hashlib
import logging
# external imports
import gnupg
# local imports
from clicada.error import AuthError
logg = logging.getLogger(__name__)
class PGPAuthCrypt:
def __init__(self, db_dir, auth_key, pgp_dir=None):
self.db_dir = db_dir
try:
bytes.fromhex(auth_key)
except TypeError:
raise AuthError('invalid key {}'.format(auth_key))
except ValueError:
raise AuthError('invalid key {}'.format(auth_key))
self.auth_key = auth_key
self.gpg = gnupg.GPG(gnupghome=pgp_dir)
def get_secret(self, passphrase=''):
if passphrase == None:
passphrase = ''
p = os.path.join(self.db_dir, '.secret')
try:
f = open(p, 'rb')
except FileNotFoundError:
h = hashlib.sha256()
h.update(bytes.fromhex(self.auth_key))
h.update(passphrase.encode('utf-8'))
z = h.digest()
secret = self.gpg.encrypt(z, [self.auth_key])
if not secret.ok:
raise AuthError('could not encrypt secret for {}'.format(auth_key))
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
f = open(p, 'wb')
f.write(secret.data)
f.close()
f = open(p, 'rb')
self.secret = self.gpg.decrypt_file(f, passphrase=passphrase)
if not self.secret.ok:
raise AuthError('could not decrypt encryption secret. wrong password?')
f.close()

View File

@ -1,11 +1,13 @@
# standard imports # standard imports
import sys import sys
import logging import logging
import datetime
# external imports # external imports
from cic_eth_registry import CICRegistry from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.ext.metadata import MetadataRequestsHandler from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.models.person import Person
from chainlib.eth.address import to_checksum_address from chainlib.eth.address import to_checksum_address
# local imports # local imports
@ -29,7 +31,7 @@ def extra_args():
'force_update': '_FORCE', 'force_update': '_FORCE',
'method': 'META_LOOKUP_METHOD', 'method': 'META_LOOKUP_METHOD',
'meta_url': 'META_URL', 'meta_url': 'META_URL',
'identifier': '_ARG_USER_IDENTIFIER', 'identifier': '_IDENTIFIER',
} }
@ -51,18 +53,17 @@ def execute(ctrl):
user_phone_file_label = 'phone' user_phone_file_label = 'phone'
user_phone_store = FileUserStore(ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL'))) user_phone_store = FileUserStore(ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')))
user_address = user_phone_store.by_phone(ctrl.get('_ARG_USER_IDENTIFIER'), update=ctrl.get('_FORCE')) user_address = user_phone_store.by_phone(ctrl.get('_IDENTIFIER'), update=ctrl.get('_FORCE'))
if user_address == None: if user_address == None:
sys.stderr.write('unknown identifier: {}\n'.format(ctrl.get('_ARG_USER_IDENTIFIER'))) sys.stderr.write('unknown identifier: {}\n'.format(ctrl.get('_IDENTIFIER')))
sys.exit(1) sys.exit(1)
try: try:
user_address = to_checksum_address(user_address) user_address = to_checksum_address(user_address)
except ValueError: except ValueError:
sys.stderr.write('invalid response "{}" for {}\n'.format(user_address, ctrl.get('_ARG_USER_IDENTIFIER'))) sys.stderr.write('invalid response "{}" for {}\n'.format(user_address, ctrl.get('_IDENTIFIER')))
sys.exit(1) sys.exit(1)
logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_IDENTIFIER')))
logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_ARG_USER_IDENTIFIER')))
txs = tx_getter.get(user_address) txs = tx_getter.get(user_address)
@ -71,6 +72,27 @@ def execute(ctrl):
user_address_file_label = 'address' user_address_file_label = 'address'
user_address_store = FileUserStore(ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL'))) user_address_store = FileUserStore(ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')))
r = user_address_store.by_address(user_address)
print("""Phone: {}
EVM address: {}
Name: {}
Registered: {}
Gender: {}
Location: {}
Products: {}
""".format(
ctrl.get('_IDENTIFIER'),
user_address,
str(r),
datetime.datetime.fromtimestamp(r.date_registered).ctime(),
r.gender,
r.location['area_name'],
','.join(r.products),
)
)
for tx_src in txs['data']: for tx_src in txs['data']:
tx = ResolvedTokenTx.from_dict(tx_src) tx = ResolvedTokenTx.from_dict(tx_src)
tx.resolve(token_store, user_address_store, update=ctrl.get('_FORCE')) tx.resolve(token_store, user_address_store, update=ctrl.get('_FORCE'))

View File

@ -10,3 +10,10 @@ cache_url =
[cic] [cic]
registry_address = registry_address =
[auth]
type = gnupg
db_path =
keyring_path =
key =
passphrase =

View File

@ -1,2 +1,6 @@
class ExpiredRecordError(Exception): class ExpiredRecordError(Exception):
pass pass
class AuthError(Exception):
pass

View File

@ -69,14 +69,16 @@ class ResolvedTokenTx(TokenTx):
return v return v
if self.tx_type == TokenTxType.faucet_giveto.value: if self.tx_type == TokenTxType.faucet_giveto.value:
return 'FAUCET' return 'FAUCET'
return user_store.get_label(self.sender) r = user_store.by_address(self.sender)
return str(r)
def resolve_recipient_entity(self, user_store, update=False): def resolve_recipient_entity(self, user_store, update=False):
v = self.resolve_stored_entity(user_store, self.recipient, update=update) v = self.resolve_stored_entity(user_store, self.recipient, update=update)
if v != None: if v != None:
return v return v
return user_store.get_label(self.recipient, update=update) r = user_store.by_address(self.recipient, update=update)
return str(r)
def resolve_entities(self, user_store, update=False): def resolve_entities(self, user_store, update=False):

View File

@ -169,7 +169,7 @@ class FileUserStore:
return user_address return user_address
def get_label(self, address, update=False): def by_address(self, address, update=False):
add = tx_normalize.wallet_address(address) add = tx_normalize.wallet_address(address)
ignore_expired = self.sticky(address) ignore_expired = self.sticky(address)
@ -202,4 +202,4 @@ class FileUserStore:
person = Person() person = Person()
person_data = person.deserialize(person_data=data) person_data = person.deserialize(person_data=data)
self.put(address, json.dumps(person_data.serialize())) self.put(address, json.dumps(person_data.serialize()))
return str(person_data) return person_data