Compare commits

...

48 Commits

Author SHA1 Message Date
6ac1dc88d2 chore: small cleanup 2022-05-02 08:45:32 +03:00
a3a3f55234 fix: disable notify 2022-05-02 08:23:55 +03:00
6e637ce04e Release v0.1.1 2022-05-01 09:19:21 +03:00
0db8e56603 fix: do not hard fail on missing meta data 2022-05-01 09:18:45 +03:00
1dba2d09d1 feat: add ability to query user using address 2022-04-27 14:36:25 +03:00
618aa7716e fix: bad conditional 2022-04-27 12:43:22 +03:00
17ae29887f fix: bump dependencies 2022-04-27 10:12:28 +03:00
fbb5168b40 Release v0.0.7 2022-03-31 12:40:41 +03:00
c269318528 Merge pull request 'fix: make store path relative to $HOME' (#17) from lum/store_path into master
Reviewed-on: #17
2022-03-31 09:34:47 +00:00
c3e8883f15 fix: make store path relative to $HOME 2022-03-31 12:33:29 +03:00
5f62287913
docs: add gpg instructions and example
note: private key in example is publicly available in cic-stack
2022-02-23 19:38:35 +03:00
28d7699a4a Merge pull request 'docs: add readme' (#16) from sohail/docs into master
Reviewed-on: #16
2022-02-14 07:01:46 +00:00
05224a9dd6
docs: add readme 2022-02-02 18:14:31 +03:00
2f65aa37ff Merge pull request 'Release 0.0.6' (#13) from lash/0.0.6 into master
release: v0.0.6
2022-01-26 06:24:39 +00:00
lash
be7dea24ac
Release 0.0.6 2022-01-24 17:03:54 +00:00
65b3d4d409 Merge pull request 'feat: Add cache encryption' (#9) from lash/encrypt into master
Reviewed-on: #9
2022-01-23 06:59:16 +00:00
lash
0bfe054b90
Auto-complete origin on missing port, magic shutil terminal size 2022-01-21 14:38:06 +00:00
lash
fe410e0fc6 Merge remote-tracking branch 'origin/master' into lash/encrypt 2022-01-21 11:29:41 +00:00
lash
3663665d91
Update packaging and deps 2022-01-21 11:28:22 +00:00
b0f8f39d15 usability: WIP Friendly progress output (#4)
This MR adds colorized progress statements when resolving and retrieving data for user. It disables logging if loglevel is set to default (logging.WARNING at this time).

It also skips metadata lookups that have failed in the same session, speeding up retrievals when same address repeatedly occurs in transaction list.

closes #6
closes #5

Co-authored-by: lash <dev@holbrook.no>
Reviewed-on: #4
Co-authored-by: lash <accounts-grassrootseconomics@holbrook.no>
Co-committed-by: lash <accounts-grassrootseconomics@holbrook.no>
2022-01-21 11:11:23 +00:00
lash
140df0d1c6
Add pycryptodome dep 2022-01-21 11:09:17 +00:00
lash
c5b4c41db0
Bump version 2022-01-21 11:04:17 +00:00
lash
d302c5754c
Add AES CTR with key as iv 2022-01-21 10:59:27 +00:00
lash
36b4fcab93
Add back crypt module 2022-01-21 10:15:43 +00:00
lash
f17e31d801
Return address on shortcircuited dud lookup 2022-01-21 10:13:11 +00:00
lash
64c7fa950c
Clean pollution from encrypt branch 2022-01-21 10:06:29 +00:00
lash
b057cb65ff
WIP add AES to local cache 2022-01-21 10:03:01 +00:00
lash
ce084bcb48
Bump version 2022-01-20 14:09:41 +00:00
lash
b7ea579aa5
Upgrade deps 2022-01-20 14:09:14 +00:00
nolash
6988c8c2b9
Bump version 2022-01-01 10:11:17 +00:00
nolash
7af7b78995
Remove stray arg object print 2022-01-01 10:10:14 +00:00
nolash
b552c933d7
Ensure all changes for home config dir override are applied to version 2021-12-17 07:50:01 +00:00
nolash
3739038340
Bump version 2021-11-15 14:42:08 +01:00
nolash
cacd8dbab8
Add settable origin to http session instantiation 2021-11-11 16:38:05 +01:00
nolash
5a337db2b0
Add packaging files 2021-11-10 09:20:25 +01:00
nolash
99156a1dba
Add phone numbers to listings 2021-11-07 03:52:52 +01:00
nolash
d8957e89c9
Update to dev prereleases for usumbufu, cic-types 2021-11-06 15:34:51 +01:00
nolash
be3bf81148
Rehabilitate tagger 2021-11-06 15:09:27 +01:00
nolash
6f1bf1ba1f
Proper classmethod parent invoke for account/person 2021-11-06 14:51:52 +01:00
nolash
0902d0c9b2
Extend serialization and deserialization with tags for person 2021-11-06 14:39:30 +01:00
nolash
8568e352be
Add usumbufu hoba capable opener object 2021-11-06 14:04:29 +01:00
nolash
e3c002082b
Add custom user info, decimals to values 2021-11-06 11:37:21 +01:00
nolash
f0ba197b08
Add balances 2021-11-06 10:59:49 +01:00
nolash
bdfdd0fdd7
Add pgp auth, subject metadata summary in cli user output 2021-11-06 10:34:40 +01:00
nolash
76b331178c
Add tagger 2021-11-06 06:29:45 +01:00
nolash
f648724205
Add local record expiry 2021-11-06 04:42:18 +01:00
nolash
e8b6173ec5
Add phone number resolution to address for cli arg identifier 2021-11-05 20:01:51 +01:00
nolash
14e0c1fdb4
Add cic-types to deps 2021-11-05 18:55:22 +01:00
23 changed files with 1114 additions and 147 deletions

4
.gitignore vendored
View File

@ -2,3 +2,7 @@ __pycache__
*.egg-info
build/
*.pyc
.venv
.clicada
dist/
.vscode/

View File

@ -1,3 +1,16 @@
- 0.0.1
- 0.0.7
* fix: make store_path relative to the users home
- 0.0.6
* Add cache encryption, with AES-CTR-128
- 0.0.5
* Replace logs with colorized progress output on default loglevel
* Do not repeat already failed metadata lookups
- 0.0.4
* Resolve metadata to labels when loading from cache
- 0.0.3
* Upgrade usumbufu to prevent missing bearer auth on https
- 0.0.2
* Use ~/.config for default config override
- 0.0.1-unreleased
* Add tx listing
* Add phone to user resolver

1
MANIFEST.in Normal file
View File

@ -0,0 +1 @@
include clicada/data/config/* *requirements* LICENSE

87
README.md Normal file
View File

@ -0,0 +1,87 @@
## Clicada
> Admin Command Line Interface to interact with cic-meta and cic-cache
### Pre-requisites
- Public key uploaded to `cic-auth-helper`
- PGP Keyring for your key
### Installation
Use either of the following installation methods:
1. Install from git release (recommended)
```bash
wget https://git.grassecon.net/grassrootseconomics/clicada/archive/v0.0.6.zip
unzip clicada-v0.0.6.zip
cd clicada
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt --extra-index-url=https://pip.grassrootseconomics.net
```
2. Install from pip to path (non sudo)
```bash
pip3 install -UI --extra-index-url=https://pip.grassrootseconomics.net clicada
```
### GPG Keyring setup
PGP uses the default keyring, you can however pass in a custom keyring path.
To create a keyring from a specific key and get its path for `AUTH_KEYRING_PATH`:
```bash
# In some dir
gpg --homedir .gnupg --import private.pgp
pwd
```
### Usage
```bash
usage: clicada [...optional arguments] [...positional arguments]
positional arguments:
{user,u,tag,t}
user (u) retrieve transactions for a user
tag (t) locally assign a display value to an identifier
optional arguments:
-h, --help show this help message and exit
--no-logs Turn off all logging
-v Be verbose
-vv Be very verbose
-c CONFIG, --config CONFIG
Configuration directory
-n NAMESPACE, --namespace NAMESPACE
Configuration namespace
--dumpconfig {env,ini}
Output configuration and quit. Use with --raw to omit values and output schema only.
--env-prefix ENV_PREFIX
environment prefix for variables to overwrite configuration
-p P, --rpc-provider P
RPC HTTP(S) provider url
--rpc-dialect RPC_DIALECT
RPC HTTP(S) backend dialect
--height HEIGHT Block height to execute against
-i I, --chain-spec I Chain specification string
-u, --unsafe Do not verify address checksums
--seq Use sequential rpc ids
-y Y, --key-file Y Keystore file to use for signing or address
--raw Do not decode output
--fee-price FEE_PRICE
override fee price
--fee-limit FEE_LIMIT
override fee limit
```
### Example
```bash
AUTH_PASSPHRASE=queenmarlena AUTH_KEYRING_PATH=/home/kamikaze/grassroots/usumbufu/tests/testdata/pgp/.gnupg/ AUTH_KEY=CCE2E1D2D0E36ADE0405E2D0995BB21816313BD5 CHAIN_SPEC=evm:byzantium:8996:bloxberg CIC_REGISTRY_ADDRESS=0xcf60ebc445b636a5ab787f9e8bc465a2a3ef8299 RPC_PROVIDER=https://rpc.grassecon.net TX_CACHE_URL=https://cache.grassecon.net HTTP_CORS_ORIGIN=https://auth.grassecon.net META_HTTP_ORIGIN=https://auth.grassecon.net:443 PYTHONPATH=. python clicada/runnable/view.py u --meta-url https://auth.grassecon.net +254711000000
```

View File

@ -1,89 +1,207 @@
# import notifier
from clicada.cli.notify import NotifyWriter
notifier = NotifyWriter()
# notifier.notify('loading script')
import importlib
import logging
# standard imports
import os
#import argparse
import logging
import importlib
import sys
# external imports
import confini
import chainlib.eth.cli
from chainlib.chain import ChainSpec
import clicada.cli.tag as cmd_tag
# local imports
import clicada.cli.user as cmd_user
# external imports
import confini
from chainlib.chain import ChainSpec
from clicada.cli.auth import PGPAuthCrypt
from clicada.cli.http import HTTPSession, PGPClientSession
from clicada.crypt.aes import AESCTREncrypt
logg = logging.getLogger()
script_dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(script_dir, '..', 'data')
base_config_dir = os.path.join(data_dir, 'config')
data_dir = os.path.join(script_dir, "..", "data")
base_config_dir = os.path.join(data_dir, "config")
class NullWriter:
def notify(self, v):
pass
def ouch(self, v):
pass
def write(self, v):
sys.stdout.write(str(v))
class CmdCtrl:
__cmd_alias = {
'u': 'user',
}
"u": "user",
"t": "tag",
}
__auth_for = [
"user",
]
def __init__(self, argv=None, description=None, logger=None, *args, **kwargs):
#self.argparser = argparse.ArgumentParser(description=description, *args, **kwargs)
self.argparser = chainlib.eth.cli.ArgumentParser(chainlib.eth.cli.argflag_std_read)
self.args(argv)
#self.argparser.add_argument('-c', type=str, help='Configuration override directory path')
#self.argparser.add_argument('-v', action='store_true', help='Be verbose')
#self.argparser.add_argument('-vv', action='store_true', help='Be very verbose')
self.logging(logger)
self.module()
self.config()
self.notifier()
self.auth()
self.blockchain()
self.remote_openers = {}
if self.get("META_URL") != None:
auth_client_session = PGPClientSession(self.__auth)
self.remote_openers["meta"] = HTTPSession(
self.get("META_URL"),
auth=auth_client_session,
origin=self.config.get("META_HTTP_ORIGIN"),
)
def blockchain(self):
self.chain_spec = ChainSpec.from_chain_str(self.config.get("CHAIN_SPEC"))
self.rpc = chainlib.eth.cli.Rpc()
self.__conn = self.rpc.connect_by_config(self.config)
def args(self, argv):
self.argparser = chainlib.eth.cli.ArgumentParser(
chainlib.eth.cli.argflag_std_read
)
sub = self.argparser.add_subparsers()
sub.dest = 'command'
sub_user = sub.add_parser('user', aliases=['u'], help='retrieve transactions for a user')
sub.dest = "command"
sub_user = sub.add_parser(
"user", aliases=["u"], help="retrieve transactions for a user"
)
cmd_user.process_args(sub_user)
sub_tag = sub.add_parser(
"tag", aliases=["t"], help="locally assign a display value to an identifier"
)
cmd_tag.process_args(sub_tag)
self.cmd_args = self.argparser.parse_args(argv)
if logger == None:
logger = logging.getLogger()
if self.cmd_args.vv:
logger.setLevel(logging.DEBUG)
elif self.cmd_args.v:
logger.setLevel(logging.INFO)
def module(self):
self.cmd_string = self.cmd_args.command
cmd_string_translate = self.__cmd_alias.get(self.cmd_string)
if cmd_string_translate != None:
self.cmd_string = cmd_string_translate
if self.cmd_string == None:
raise ValueError('Subcommand missing')
modname = 'clicada.cli.{}'.format(self.cmd_string)
logger.debug('using module {}'.format(modname))
self.cmd_string = "none"
modname = "clicada.cli.{}".format(self.cmd_string)
self.logger.debug("using module {}".format(modname))
self.cmd_mod = importlib.import_module(modname)
# if self.cmd_args.c:
# self.config = confini.Config(base_config_dir, override_dirs=self.cmd_args.c)
# else:
# self.config = confini.Config(base_config_dir)
def logging(self, logger):
self.logger = logger
if self.logger == None:
self.logger = logging.getLogger()
if self.cmd_args.vv:
self.logger.setLevel(logging.DEBUG)
elif self.cmd_args.v:
self.logger.setLevel(logging.INFO)
def config(self):
override_dir = self.cmd_args.config
if override_dir == None:
p = os.environ.get("HOME")
if p != None:
p = os.path.join(p, ".config", "cic", "clicada")
try:
os.stat(p)
override_dir = p
logg.info(
"applying user config override from standard location: {}".format(
p
)
)
except FileNotFoundError:
pass
extra_args = self.cmd_mod.extra_args()
logger.debug('using extra args {}'.format(extra_args))
if self.cmd_args.config:
self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args, override_dirs=self.cmd_args.c)
else:
self.config = chainlib.eth.cli.Config.from_args(self.cmd_args, base_config_dir=base_config_dir, extra_args=extra_args)
self.config = chainlib.eth.cli.Config.from_args(
self.cmd_args,
base_config_dir=base_config_dir,
extra_args=extra_args,
default_config_dir=override_dir,
)
self.config.add(False, '_SEQ')
logger.debug('loaded config:\n{}'.format(self.config))
self.config.add(False, "_SEQ")
self.chain_spec = ChainSpec.from_chain_str(self.config.get('CHAIN_SPEC'))
self.config.censor("AUTH_PASSPHRASE")
self.rpc = chainlib.eth.cli.Rpc()
self.__conn = self.rpc.connect_by_config(self.config)
self.logger.debug("loaded config:\n{}".format(self.config))
def auth(self):
typ = self.get("AUTH_TYPE")
if typ != "gnupg":
raise NotImplementedError("Valid aut implementations are: gnupg")
default_auth_db_path = None
if os.environ.get("HOME") != None:
default_auth_db_path = os.path.join(
os.environ["HOME"], ".local/share/cic/clicada"
)
auth_db_path = self.get("AUTH_DB_PATH", default_auth_db_path)
self.__auth = PGPAuthCrypt(
auth_db_path, self.get("AUTH_KEY"), self.get("AUTH_KEYRING_PATH")
)
self.__auth.get_secret(self.get("AUTH_PASSPHRASE"))
self.encrypter = AESCTREncrypt(auth_db_path, self.__auth.secret)
def get(self, k):
return self.config.get(k)
def get(self, k, default=None):
r = self.config.get(k, default)
if k in [
"_FORCE",
]:
if r == None:
return False
return self.config.true(k)
return r
def chain(self):
return self.chain_spec
def conn(self):
return self.__conn
def execute(self):
self.cmd_mod.execute(self)
def opener(self, k):
return self.remote_openers[k]
def notifier(self):
if logg.root.level >= logging.WARNING:
logging.disable()
self.writer = notifier
else:
self.writer = NullWriter()
def notify(self, v):
if logg.root.level <= logging.INFO:
print("\033[96m" + v + "\033[0m")
def ouch(self, v):
print("\033[91m" + v + "\033[0m")
def write(self, v):
print(v)

77
clicada/cli/auth.py Normal file
View File

@ -0,0 +1,77 @@
# standard imports
import os
import hashlib
import logging
# external imports
import gnupg
# local imports
from clicada.error import AuthError
logg = logging.getLogger(__name__)
class PGPAuthCrypt:
typ = 'gnupg'
def __init__(self, db_dir, auth_key, pgp_dir=None):
self.db_dir = db_dir
try:
bytes.fromhex(auth_key)
except TypeError:
raise AuthError('invalid key {}'.format(auth_key))
except ValueError:
raise AuthError('invalid key {}'.format(auth_key))
self.auth_key = auth_key
self.gpg = gnupg.GPG(gnupghome=pgp_dir)
self.secret = None
def get_secret(self, passphrase=''):
if passphrase == None:
passphrase = ''
p = os.path.join(self.db_dir, '.secret')
try:
f = open(p, 'rb')
except FileNotFoundError:
h = hashlib.sha256()
h.update(bytes.fromhex(self.auth_key))
h.update(passphrase.encode('utf-8'))
z = h.digest()
secret = self.gpg.encrypt(z, [self.auth_key], always_trust=True)
if not secret.ok:
raise AuthError('could not encrypt secret for {}'.format(self.auth_key))
d = os.path.dirname(p)
os.makedirs(d, exist_ok=True)
f = open(p, 'wb')
f.write(secret.data)
f.close()
f = open(p, 'rb')
secret = self.gpg.decrypt_file(f, passphrase=passphrase)
if not secret.ok:
raise AuthError('could not decrypt encryption secret. wrong password?')
f.close()
self.secret = secret.data
self.__passphrase = passphrase
def get_passphrase(self):
return self.__passphrase
def fingerprint(self):
return self.auth_key
def sign(self, plaintext, encoding, passphrase='', detach=True):
r = self.gpg.sign(plaintext, passphrase=passphrase, detach=detach)
if len(r.data) == 0:
raise AuthError('signing failed: ' + r.status)
if encoding == 'base64':
r = r.data
return r

95
clicada/cli/http.py Normal file
View File

@ -0,0 +1,95 @@
# standard imports
import hashlib
import urllib.parse
import os
import logging
from socket import getservbyname
# external imports
from usumbufu.client.base import (
ClientSession,
BaseTokenStore,
)
from usumbufu.client.bearer import BearerClientSession
from usumbufu.client.hoba import HobaClientSession
logg = logging.getLogger(__name__)
class PGPClientSession(HobaClientSession):
alg = '969'
def __init__(self, auth):
self.auth = auth
self.origin = None
self.fingerprint = self.auth.fingerprint()
def sign_auth_challenge(self, plaintext, hoba, encoding):
passphrase = self.auth.get_passphrase()
r = self.auth.sign(plaintext, encoding, passphrase=passphrase, detach=True)
hoba.signature = r
return str(hoba)
def __str__(self):
return 'clicada hoba/pgp auth'
def __repr__(self):
return 'clicada hoba/pgp auth'
class HTTPSession:
token_dir = '/run/user/{}/clicada/usumbufu/.token'.format(os.getuid())
def __init__(self, url, auth=None, origin=None):
self.base_url = url
url_parts = urllib.parse.urlsplit(self.base_url)
url_parts_origin_host = url_parts[1].split(":")
host = url_parts_origin_host[0]
try:
host = host + ':' + url_parts_origin_host[1]
except IndexError:
host = host + ':' + str(getservbyname(url_parts[0]))
logg.info('changed origin with missing port number from {} to {}'.format(url_parts[1], host))
url_parts_origin = (url_parts[0], host, '', '', '',)
self.origin = origin
if self.origin == None:
self.origin = urllib.parse.urlunsplit(url_parts_origin)
else:
logg.debug('overriding http origin for {} with {}'.format(url, self.origin))
h = hashlib.sha256()
h.update(self.base_url.encode('utf-8'))
z = h.digest()
token_store_dir = os.path.join(self.token_dir, z.hex())
os.makedirs(token_store_dir, exist_ok=True)
self.token_store = BaseTokenStore(path=token_store_dir)
self.session = ClientSession(self.origin, token_store=self.token_store)
bearer_handler = BearerClientSession(self.origin, token_store=self.token_store)
self.session.add_subhandler(bearer_handler)
if auth != None:
auth.origin = self.origin
self.session.add_subhandler(auth)
self.opener = urllib.request.build_opener(self.session)
def open(self, endpoint):
url = urllib.parse.urljoin(self.base_url, endpoint)
logg.debug('open {} with opener {}'.format(url, self))
r = self.opener.open(url)
return r.read().decode('utf-8')
def __str__(self):
return str(self.session)

14
clicada/cli/none.py Normal file
View File

@ -0,0 +1,14 @@
def process_args(argparser):
pass
def extra_args():
return {}
def apply_args(config, args):
pass
def validate(config, args):
pass
def execute(ctrl):
raise ValueError('subcommand missing')

33
clicada/cli/notify.py Normal file
View File

@ -0,0 +1,33 @@
# standard imports
import os
import sys
import shutil
class NotifyWriter:
def __init__(self, writer=sys.stdout):
(c, r) = shutil.get_terminal_size()
self.cols = c
self.fmt = "\r{:" + "<{}".format(c) + "}"
self.w = writer
self.notify_max = self.cols - 4
def notify(self, v):
if len(v) > self.notify_max:
v = v[:self.notify_max]
self.write('\x1b[0;36m... ' + v + '\x1b[0;39m')
def ouch(self, v):
if len(v) > self.notify_max:
v = v[:self.notify_max]
self.write('\x1b[0;91m!!! ' + v + '\x1b[0;39m')
def write(self, v):
s = str(v)
if len(s) > self.cols:
s = s[:self.cols]
self.w.write(self.fmt.format(s))

42
clicada/cli/tag.py Normal file
View File

@ -0,0 +1,42 @@
# standard imports
import json
# external imports
from clicada.user import FileUserStore
from pathlib import Path
import os
categories = [
'phone',
'address',
]
def process_args(argparser):
argparser.add_argument('--category', required=True, type=str, help='Identifier category')
argparser.add_argument('identifier', type=str, help='Identifier to store a display tag for')
argparser.add_argument('tag', type=str, help='Display tag to store for the identifier')
def extra_args():
return {
'category': None,
'identifier': None,
'tag': None,
}
def apply_args(config, args):
pass
def validate(config, args):
if category not in categories:
raise ValueError('Invalid category. Valid categories are: {}'.format(','.join(categories)))
def execute(ctrl):
store_path = os.path.join(str(Path.home()), '.clicada')
user_store = FileUserStore(None, ctrl.chain(), ctrl.get('_CATEGORY'), store_path, int(ctrl.get('FILESTORE_TTL')))
user_store.put(ctrl.get('_IDENTIFIER'), json.dumps(ctrl.get('_TAG')), force=True)
user_store.stick(ctrl.get('_IDENTIFIER'))

View File

@ -1,24 +1,165 @@
# standard imports
import datetime
import logging
import os
import sys
from pathlib import Path
from chainlib.encode import TxHexNormalizer
from chainlib.eth.address import is_address, to_checksum_address
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.models.person import Person
from clicada.error import MetadataNotFoundError
from clicada.token import FileTokenStore, token_balance
# local imports
from clicada.tx import ResolvedTokenTx, TxGetter
from clicada.user import FileUserStore
from hexathon import add_0x
logg = logging.getLogger(__name__)
tx_normalizer = TxHexNormalizer()
def process_args(argparser):
argparser.add_argument('-m', '--method', type=str, help='lookup method')
argparser.add_argument('--meta-url', dest='meta_url', type=str, help='url to retrieve metadata from')
argparser.add_argument('identifier', type=str, help='user identifier')
argparser.add_argument("-m", "--method", type=str, help="lookup method")
argparser.add_argument(
"--meta-url", dest="meta_url", type=str, help="Url to retrieve metadata from"
)
argparser.add_argument(
"-f",
"--force-update",
dest="force_update",
action="store_true",
help="Update records of mutable entries",
)
argparser.add_argument(
"identifier", type=str, help="user identifier (phone_number or address)"
)
def extra_args():
return {
'method': 'META_LOOKUP_METHOD',
'meta_url': 'META_URL',
'identifier': '_ARG_USER_IDENTIFIER',
}
def validate_args(args):
pass
"force_update": "_FORCE",
"method": "META_LOOKUP_METHOD",
"meta_url": "META_URL",
"identifier": "_IDENTIFIER",
}
def apply_args(config, args):
if config.get("META_LOOKUP_METHOD"):
raise NotImplementedError(
'Sorry, currently only "phone" lookup method is implemented'
)
def validate(config, args):
pass
def execute(config, args):
pass
def execute(ctrl):
tx_getter = TxGetter(ctrl.get("TX_CACHE_URL"), 10)
store_path = os.path.join(str(Path.home()), ".clicada")
user_phone_file_label = "phone"
user_phone_store = FileUserStore(
ctrl.opener("meta"),
ctrl.chain(),
user_phone_file_label,
store_path,
int(ctrl.get("FILESTORE_TTL")),
encrypter=ctrl.encrypter,
)
identifier = ctrl.get("_IDENTIFIER")
ctrl.notify("resolving identifier {} to wallet address".format(identifier))
if is_address(identifier):
user_address = identifier
else:
user_address = user_phone_store.by_phone(identifier, update=ctrl.get("_FORCE"))
if user_address == None:
ctrl.ouch("unknown identifier: {}\n".format(identifier))
sys.exit(1)
try:
user_address = to_checksum_address(user_address)
except ValueError:
ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, identifier))
sys.exit(1)
logg.debug("loaded user address {} for {}".format(user_address, identifier))
user_address_normal = tx_normalizer.wallet_address(user_address)
ctrl.notify("retrieving txs for address {}".format(user_address_normal))
txs = tx_getter.get(user_address)
token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), "token", store_path)
user_address_file_label = "address"
user_address_store = FileUserStore(
ctrl.opener("meta"),
ctrl.chain(),
user_address_file_label,
store_path,
int(ctrl.get("FILESTORE_TTL")),
encrypter=ctrl.encrypter,
)
r = None
ctrl.write(
f"""
Phone: {ctrl.get("_IDENTIFIER")}
Network address: {add_0x(user_address)}
Chain: {ctrl.chain().common_name()}"""
)
ctrl.notify("resolving metadata for address {}".format(user_address_normal))
try:
r = user_address_store.by_address(
user_address_normal, update=ctrl.get("_FORCE")
)
if r:
ctrl.write(
f"""
Name: { str(r)}
Registered: {datetime.datetime.fromtimestamp(r.date_registered).ctime()}
Gender: {r.gender}
Location: {r.location["area_name"]}
Products: {",".join(r.products)}
Tags: {",".join(r.tags)}"""
)
except MetadataNotFoundError as e:
ctrl.ouch(f"MetadataNotFoundError: Could not resolve metadata for user {e}\n")
tx_lines = []
seen_tokens = {}
for tx_src in txs["data"]:
ctrl.notify("resolve details for tx {}".format(tx_src["tx_hash"]))
tx = ResolvedTokenTx.from_dict(tx_src)
tx.resolve(
token_store,
user_address_store,
show_decimals=True,
update=ctrl.get("_FORCE"),
)
tx_lines.append(tx)
seen_tokens[tx.source_token_label] = tx.source_token
seen_tokens[tx.destination_token_label] = tx.destination_token
for k in seen_tokens.keys():
ctrl.notify("resolve token {}".format(seen_tokens[k]))
(token_symbol, token_decimals) = token_store.by_address(seen_tokens[k])
ctrl.notify(
"get token balance for {} => {}".format(token_symbol, seen_tokens[k])
)
balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address)
fmt = "{:." + str(token_decimals) + "f}"
decimal_balance = fmt.format(balance / (10**token_decimals))
ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance))
print()
for l in tx_lines:
ctrl.write(l)

42
clicada/crypt/aes.py Normal file
View File

@ -0,0 +1,42 @@
# standard imports
import os
import logging
import hashlib
from Crypto.Cipher import AES
from Crypto.Util import Counter
from .base import Encrypter
logg = logging.getLogger(__name__)
class AESCTREncrypt(Encrypter):
aes_block_size = 1 << 7
counter_bytes = int(128 / 8)
def __init__(self, db_dir, secret):
self.secret = secret
def key_to_iv(self, k):
h = hashlib.sha256()
h.update(k.encode('utf-8'))
h.update(self.secret)
z = h.digest()
return int.from_bytes(z[:self.counter_bytes], 'big')
def encrypt(self, k, v):
iv = self.key_to_iv(k)
ctr = Counter.new(self.aes_block_size, initial_value=iv)
cipher = AES.new(self.secret, AES.MODE_CTR, counter=ctr)
return cipher.encrypt(v)
def decrypt(self, k, v):
iv = self.key_to_iv(k)
ctr = Counter.new(self.aes_block_size, initial_value=iv)
cipher = AES.new(self.secret, AES.MODE_CTR, counter=ctr)
return cipher.decrypt(v)

8
clicada/crypt/base.py Normal file
View File

@ -0,0 +1,8 @@
class Encrypter:
def encrypt(self, v):
raise NotImplementedError()
def decrypt(self, v):
raise NotImplementedError()

View File

@ -1,9 +1,21 @@
[meta]
lookup_method = phone
url =
http_origin =
[filestore]
ttl = 86400
[tx]
cache_url =
http_origin =
[cic]
registry_address =
[auth]
type = gnupg
db_path =
keyring_path =
key =
passphrase =

10
clicada/error.py Normal file
View File

@ -0,0 +1,10 @@
class ExpiredRecordError(Exception):
pass
class AuthError(Exception):
pass
class MetadataNotFoundError(Exception):
pass

View File

@ -2,47 +2,18 @@
import sys
import logging
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.ext.metadata import MetadataRequestsHandler
# local imports
from clicada.tx import TxGetter
from clicada.user import FileUserStore
from clicada.token import FileTokenStore
from clicada.cli import CmdCtrl
from clicada.tx import ResolvedTokenTx
logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
ctrl = CmdCtrl(argv=sys.argv[1:], logger=logg)
tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'))
def main():
ctrl.execute()
store_path = '.clicada'
user_phone_file_label = 'phone'
user_phone_store = FileUserStore(ctrl.chain(), user_phone_file_label, store_path)
try:
user_phone_store.put('25413241324', '5c4f9EeE1a6375d30f50ab547cD4EE21B37ac8Ab')
except FileExistsError:
logg.debug('already have record')
k = '25413241324'
user_address = user_phone_store.get(k)
logg.debug('loaded user address {} for {}'.format(user_address, k))
txs = tx_getter.get(user_address)
token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path)
MetadataRequestsHandler.base_url = ctrl.get('META_URL')
user_address_file_label = 'address'
user_address_store = FileUserStore(ctrl.chain(), user_address_file_label, store_path)
for tx_src in txs['data']:
tx = ResolvedTokenTx.from_dict(tx_src)
tx.resolve(token_store, user_address_store)
print(tx)
if __name__ == '__main__':
main()

View File

@ -1 +1,2 @@
from .token import *
from .util import token_balance

10
clicada/token/util.py Normal file
View File

@ -0,0 +1,10 @@
# external imports
from eth_erc20 import ERC20
from chainlib.eth.constant import ZERO_ADDRESS
def token_balance(chain_spec, conn, token_address, wallet_address, caller_address=ZERO_ADDRESS):
c = ERC20(chain_spec)
o = c.balance(token_address, wallet_address, sender_address=caller_address)
r = conn.do(o)
return c.parse_balance(r)

View File

@ -12,6 +12,10 @@ from cic_types.models.tx import (
# local imports
from clicada.encode import tx_normalize
from clicada.error import (
ExpiredRecordError,
MetadataNotFoundError,
)
logg = logging.getLogger(__name__)
@ -22,71 +26,70 @@ class ResolvedTokenTx(TokenTx):
super(ResolvedTokenTx, self).__init__()
self.source_token_name = None
self.destination_token_name = None
self.source_token_decimals = None
self.destination_token_decimals = None
self.symmetric = True
self.sender_entity = None
self.recipient_entity = None
def resolve_tokens(self, token_store, show_decimals=False):
def resolve_tokens(self, token_store, show_decimals=False, update=False):
(token_symbol, token_decimals) = token_store.by_address(self.source_token)
self.source_token_decimals = token_decimals
self.source_token_label = token_symbol
token_value = self.to_value / (10 ** token_decimals)
show_token_decimals = token_decimals
if not show_decimals:
token_decimals = 0
fmt = '{:.' + str(token_decimals) + 'f}'
show_token_decimals = 0
fmt = '{:.' + str(show_token_decimals) + 'f}'
self.from_value_label = fmt.format(token_value)
if self.destination_token != self.source_token:
self.symmetric = False
(token_symbol, token_decimals) = token_store.by_address(self.destination_token)
show_token_decimals = token_decimals
self.destination_token_label = token_symbol
self.destination_token_decimals = token_decimals
token_value = self.to_value / (10 ** token_decimals)
if not show_decimals:
token_decimals = 0
fmt = '{:.' + str(token_decimals) + 'f}'
show_token_decimals = 0
fmt = '{:.' + str(show_token_decimals) + 'f}'
self.to_value_label = fmt.format(token_value)
def resolve_stored_entity(self, user_store, address):
address = tx_normalize.wallet_address(address)
def resolve_entity(self, user_store, address):
try:
v = user_store.get(address)
return v
except FileNotFoundError:
return None
r = user_store.by_address(address)
except MetadataNotFoundError:
return address
return str(r)
def resolve_sender_entity(self, user_store):
v = self.resolve_stored_entity(user_store, self.sender)
if v != None:
return v
def resolve_sender_entity(self, user_store, update=False):
if self.tx_type == TokenTxType.faucet_giveto.value:
return 'FAUCET'
return user_store.get_metadata(self.sender)
return self.resolve_entity(user_store, self.sender)
def resolve_recipient_entity(self, user_store, update=False):
return self.resolve_entity(user_store, self.recipient)
def resolve_recipient_entity(self, user_store):
v = self.resolve_stored_entity(user_store, self.recipient)
if v != None:
return v
return user_store.get_metadata(self.recipient)
def resolve_entities(self, user_store, update=False):
self.sender_label = self.resolve_sender_entity(user_store, update=update)
self.recipient_label = self.resolve_recipient_entity(user_store, update=update)
def resolve_entities(self, user_store):
self.sender_label = self.resolve_sender_entity(user_store)
self.recipient_label = self.resolve_recipient_entity(user_store)
def resolve(self, token_store, user_store, show_decimals=False):
self.resolve_tokens(token_store, show_decimals)
self.resolve_entities(user_store)
def resolve(self, token_store, user_store, show_decimals=False, update=False):
self.resolve_tokens(token_store, show_decimals, update=update)
self.resolve_entities(user_store, update=update)
def __str__(self):
if self.symmetric:
return '{}: {} => {}\t{} {}'.format(
return '{} {} => {} {} {}'.format(
self.date_block_label,
self.sender_label,
self.recipient_label,

View File

@ -2,24 +2,73 @@
import stat
import os
import logging
import json
import urllib.parse
import datetime
# external imports
from hexathon import strip_0x
from cic_types.condiments import MetadataPointer
from cic_types.ext.metadata import MetadataRequestsHandler
from cic_types.models.person import Person
import requests.exceptions
from cic_types.ext.requests import make_request
from cic_types.processor import generate_metadata_pointer
from requests.exceptions import HTTPError
import phonenumbers
# local imports
from clicada.encode import tx_normalize
from clicada.store.mem import MemDictStore
from clicada.error import (
ExpiredRecordError,
MetadataNotFoundError,
)
logg = logging.getLogger(__name__)
class Account(Person):
def __init__(self):
super(Account, self).__init__()
self.tags = []
def apply_custom(self, custom_data):
self.tags = custom_data['tags']
logg.debug('tags are now {}'.format(self.tags))
@classmethod
def deserialize(cls, person_data):
o = super(Account, cls).deserialize(person_data)
try:
o.tags = person_data['custom']['tags']
except KeyError as e:
pass
return o
def serialize(self):
o = super(Account, self).serialize()
o['custom'] = {}
o['custom']['tags'] = self.tags
return o
def __str__(self):
return '{} {} ({})'.format(
self.given_name,
self.family_name,
self.tel,
)
class FileUserStore:
def __init__(self, chain_spec, label, store_base_path):
def __init__(self, metadata_opener, chain_spec, label, store_base_path, ttl, encrypter=None):
invalidate_before = datetime.datetime.now() - datetime.timedelta(seconds=ttl)
self.invalidate_before = int(invalidate_before.timestamp())
self.have_xattr = False
self.label = label
self.store_path = os.path.join(
store_base_path,
@ -31,6 +80,9 @@ class FileUserStore:
self.memstore_entity = MemDictStore()
os.makedirs(self.store_path, exist_ok=True)
self.__validate_dir()
self.metadata_opener = metadata_opener
self.failed_entities = {}
self.encrypter = encrypter
def __validate_dir(self):
@ -39,43 +91,201 @@ class FileUserStore:
logg.debug('using existing file store {} for {}'.format(self.store_path, self.label))
def is_dud(self, address):
return bool(self.failed_entities.get(address))
def put(self, k, v, force=False):
have_file = False
p = os.path.join(self.store_path, k)
its_time = True
try:
st = os.stat(p)
have_file = True
its_time = st[stat.ST_MTIME] < self.invalidate_before
except FileNotFoundError:
pass
if have_file and not force:
if have_file and not its_time and not force:
raise FileExistsError('user resolution already exists for {}'.format(k))
f = open(p, 'w')
f.write(v)
ve = v
f = None
if self.encrypter != None:
ve = self.encrypter.encrypt(k, ve.encode('utf-8'))
f = open(p, 'wb')
else:
f = open(p, 'w')
f.write(ve)
f.close()
logg.info('added user store {} record {} -> {}'.format(self.label, k, v))
def get(self, k):
def stick(self, k):
p = os.path.join(self.store_path, k)
f = open(p, 'r')
r = f.read()
if self.have_xattr:
os.setxattr(p, 'cic_sticky', b'1')
else:
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
f = open(sp, 'w')
f.close()
def __is_sticky(self, p):
if self.have_xattr:
if not os.getxattr(p, 'cic_sticky'):
return False
else:
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
try:
os.stat(sp)
except FileNotFoundError:
return False
return True
def __unstick(self, p):
(s_h, s_t) = os.path.split(p)
sp = os.path.join(s_h, '.stick_' + s_t)
os.unlink(sp)
def path(self, k):
return os.path.join(self.store_path, k)
def sticky(self, k):
p = self.path(k)
return self.__is_sticky(p)
def check_expiry(self, p):
st = os.stat(p)
if st[stat.ST_MTIME] < self.invalidate_before:
if self.__is_sticky(p):
logg.debug('ignoring sticky record expiry for {}'.format(p))
return
raise ExpiredRecordError('record in {} is expired'.format(p))
def get(self, k, ignore_expired=False):
try:
p = os.path.join(self.store_path, k)
except FileNotFoundError as e:
if self.__is_sticky(p):
logg.warning('removed orphaned sticky record for ' + p)
self.__unstick(p)
self.check_expiry(p)
f = None
if self.encrypter != None:
f = open(p, 'rb')
else:
f = open(p, 'r')
v = f.read()
f.close()
return r.strip()
if self.encrypter != None:
v = self.encrypter.decrypt(k, v)
logg.debug('>>>>>>>>>>>>< v decoded {}'.format(v))
v = v.decode('utf-8')
logg.debug('retrieved {} from {}'.format(k, p))
return v.strip()
def get_metadata(self, address):
def by_phone(self, phone, update=False):
phone = phonenumbers.parse(phone, None)
phone = phonenumbers.format_number(phone, phonenumbers.PhoneNumberFormat.E164)
phone_file = phone.replace('+', '')
ignore_expired = self.sticky(phone_file)
if not update:
try:
v = self.get(phone_file, ignore_expired=ignore_expired)
return v
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
getter = self.metadata_opener
ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE)
r = None
user_address = None
try:
r = getter.open(ptr)
user_address = json.loads(r)
except HTTPError as e:
logg.debug('no address found for phone {}: {}'.format(phone, e))
return None
self.put(phone_file, user_address, force=update)
return user_address
def metadata_to_person(self, v):
person = Account()
try:
person_data = person.deserialize(person_data=v)
except Exception as e:
person_data = v
return person_data
def by_address(self, address, update=False):
address = tx_normalize.wallet_address(address)
address = strip_0x(address)
getter = MetadataRequestsHandler(MetadataPointer.PERSON, bytes.fromhex(address))
#if self.failed_entities.get(address):
if self.is_dud(address):
logg.debug('already tried and failed {}, skipping'.format(address))
return address
ignore_expired = self.sticky(address)
if not update:
try:
v = self.get(address, ignore_expired=ignore_expired)
v = json.loads(v)
return self.metadata_to_person(v)
except FileNotFoundError:
pass
except ExpiredRecordError as e:
logg.info(e)
pass
getter = self.metadata_opener
ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.PERSON)
r = None
try:
r = getter.query()
except requests.exceptions.HTTPError as e:
r = getter.open(ptr)
except Exception as e:
logg.debug('no metadata found for {}: {}'.format(address, e))
return address
data = r.json()
person = Person()
if not r:
self.failed_entities[address] = True
raise MetadataNotFoundError()
data = json.loads(r)
person = Account()
person_data = person.deserialize(person_data=data)
return str(person_data)
ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.CUSTOM)
r = None
try:
r = getter.open(ptr)
o = json.loads(r)
person_data.apply_custom(o)
except Exception as e:
pass
self.put(address, json.dumps(person_data.serialize()), force=update)
return person_data

View File

@ -1,3 +1,10 @@
usumbufu~=0.3.2
confini~=0.5.1
cic-eth-registry~=0.6.1
usumbufu~=0.3.8
confini~=0.6.0
cic-eth-registry~=0.6.9
cic-types~=0.2.2
phonenumbers==8.12.12
eth-erc20~=0.3.0
hexathon~=0.1.0
pycryptodome~=3.10.1
chainlib-eth~=0.1.0
chainlib~=0.1.0

37
setup.cfg Normal file
View File

@ -0,0 +1,37 @@
[metadata]
name = clicada
version = 0.1.3
description = CLI CRM tool for the cic-stack custodial wallet system
author = Louis Holbrook
author_email = dev@holbrook.no
url = https://gitlab.com/chaintools/chainlib
keywords =
dlt
evm
blockchain
cryptocurrency
classifiers =
Programming Language :: Python :: 3
Operating System :: OS Independent
Development Status :: 3 - Alpha
Environment :: Console
Intended Audience :: Developers
License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)
Topic :: Internet
license = GPL3
licence_files =
LICENSE.txt
[options]
python_requires = >= 3.6
include_package_data = True
packages =
clicada
clicada.runnable
clicada.store
clicada.token
clicada.cli
clicada.tx
clicada.user
clicada.crypt

31
setup.py Normal file
View File

@ -0,0 +1,31 @@
from setuptools import setup
import os
requirements = []
f = open('requirements.txt', 'r')
while True:
l = f.readline()
if l == '':
break
requirements.append(l.rstrip())
f.close()
#test_requirements = []
#f = open('test_requirements.txt', 'r')
#while True:
# l = f.readline()
# if l == '':
# break
# eth_requirements.append(l.rstrip())
#f.close()
setup(
install_requires=requirements,
entry_points={
'console_scripts': [
'clicada=clicada.runnable.view:main',
],
},
)