clicada/clicada/cli/user.py

256 lines
9.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import sys
import logging
import datetime
import threading
from queue import SimpleQueue as Queue
# external imports
from cic_eth_registry import CICRegistry
from cic_eth_registry.lookup.tokenindex import TokenIndexLookup
from cic_types.models.person import Person
from chainlib.eth.address import to_checksum_address
from chainlib.encode import TxHexNormalizer
from hexathon import (
add_0x,
strip_0x,
)
# local imports
from clicada.tx import TxGetter
from clicada.user import FileUserStore
from clicada.token import (
FileTokenStore,
token_balance,
)
from clicada.tx import ResolvedTokenTx
from clicada.tx.file import FileTxStore
from clicada.error import MetadataNotFoundError
logg = logging.getLogger(__name__)
tx_normalizer = TxHexNormalizer()
def process_args(argparser):
argparser.add_argument('-m', '--method', type=str, help='lookup method')
argparser.add_argument('--meta-url', dest='meta_url', type=str, help='Url to retrieve metadata from')
argparser.add_argument('-f', '--force-update', dest='force_update', action='store_true', help='Update records of mutable entries')
argparser.add_argument('-ff', '--force-update-all', dest='force_update_all', action='store_true', help='Update records of mutable entries and immutable entries')
argparser.add_argument('--raw-tx', dest='raw_tx', action='store_true', help='Also cache raw transaction data')
argparser.add_argument('identifier', type=str, help='user identifier')
def extra_args():
return {
'raw_tx': '_RAW_TX',
'force_update': '_FORCE',
'force_update_all': '_FORCE_ALL',
'method': 'META_LOOKUP_METHOD',
'meta_url': 'META_URL',
'identifier': '_IDENTIFIER',
}
def apply_args(config, args):
if config.get('META_LOOKUP_METHOD'):
raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented')
def validate(config, args):
pass
def execute(ctrl):
tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 50)
store_path = '.clicada'
user_phone_file_label = 'phone'
user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter, notifier=ctrl)
ctrl.notify('resolving identifier {} to wallet address'.format(ctrl.get('_IDENTIFIER')))
user_address = user_phone_store.by_phone(ctrl.get('_IDENTIFIER'), update=ctrl.get('_FORCE'))
if user_address == None:
ctrl.ouch('unknown identifier: {}\n'.format(ctrl.get('_IDENTIFIER')))
sys.exit(1)
try:
user_address = to_checksum_address(user_address)
except ValueError:
ctrl.ouch('invalid response "{}" for {}\n'.format(user_address, ctrl.get('_IDENTIFIER')))
sys.exit(1)
logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_IDENTIFIER')))
user_address_normal = tx_normalizer.wallet_address(user_address)
ctrl.notify('retrieving txs for address {}'.format(user_address_normal))
txs = tx_getter.get(user_address)
token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path)
user_address_file_label = 'address'
user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter, notifier=ctrl)
ctrl.notify('resolving metadata for address {}'.format(user_address_normal))
try:
r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE'))
except MetadataNotFoundError as e:
ctrl.ouch('could not resolve metadata for user: {}'.format(e))
sys.exit(1)
ctrl.write("""Phone: {}
Network address: {}
Chain: {}
Name: {}
Registered: {}
Gender: {}
Location: {}
Products: {}
Tags: {}""".format(
ctrl.get('_IDENTIFIER'),
add_0x(user_address),
ctrl.chain().common_name(),
str(r),
datetime.datetime.fromtimestamp(r.date_registered).ctime(),
r.gender,
r.location['area_name'],
','.join(r.products),
','.join(r.tags),
)
)
raw_rpc = None
if ctrl.get('_RAW_TX'):
raw_rpc = ctrl.rpc
tx_store = FileTxStore(store_path, rpc=raw_rpc, notifier=ctrl)
tx_lines = []
seen_tokens = {}
tx_threads = []
token_resolver_queue = Queue()
token_result_queue = Queue()
tx_queue = Queue()
token_resolver_worker = TokenResolverWorker(user_address, ctrl, token_store, token_resolver_queue, token_result_queue)
token_resolver_worker.start()
tx_n = 0
for tx_src in txs['data']:
tx_hash = strip_0x(tx_src['tx_hash'])
tx_worker = TxResolverWorker(tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_resolver_queue, tx_queue, show_decimals=True, update=ctrl.get('_FORCE'))
tx_thread = tx_worker.start()
tx_threads.append(tx_worker)
tx_n += 1
# tx = ResolvedTokenTx.from_dict(tx_src)
# tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE'))
# tx_lines.append(tx)
# seen_tokens[tx.source_token_label] = tx.source_token
# seen_tokens[tx.destination_token_label] = tx.destination_token
# tx_store.put(tx_hash, str(tx_src), overwrite=ctrl.get('_FORCE_ALL'))
# needs to be blocked by wait for all txs
tx_buf = {}
for i in range(0, tx_n):
tx = tx_queue.get()
if tx == None:
break
# ugh, ugly
#k = float('{}.{}'.format(tx.block_number, tx.tx_index))
# tx_index is missing, this is temporary sort measure
k = str(tx.block_number) + '.' + tx.tx_hash
tx_buf[k] = tx
ctrl.notify('wait for transaction getters to finish work')
for tx_thread in tx_threads:
tx_thread.join()
token_resolver_queue.put_nowait(None)
token_buf = ''
while True:
l = token_result_queue.get()
if l == None:
break
#ctrl.write(' {} {}'.format(l[0], l[1]))
token_buf += ' {} {}\n'.format(l[0], l[1])
ctrl.notify('wait for token resolver to finish work')
token_resolver_worker.join()
ctrl.write("Balances:")
ctrl.write(token_buf)
ctrl.write('')
ks = list(tx_buf.keys())
ks.sort()
ks.reverse()
for k in ks:
ctrl.write(tx_buf[k])
class TxResolverWorker(threading.Thread):
def __init__(self, tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_queue, tx_queue, show_decimals=True, update=None):
self.tx_hash = tx_hash
self.tx_src = tx_src
self.ctrl = ctrl
self.token_store = token_store
self.user_address_store = user_address_store
self.show_decimals = show_decimals
self.update = update
self.token_queue = token_queue
self.tx_store = tx_store
self.tx_queue = tx_queue
super(TxResolverWorker, self).__init__()
def run(self):
self.ctrl.notify('resolve details for tx {}'.format(self.tx_hash))
tx = ResolvedTokenTx.from_dict(self.tx_src)
tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update)
self.token_queue.put_nowait(tx.source_token)
self.token_queue.put_nowait(tx.destination_token)
try:
self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL'))
except FileExistsError:
logg.debug('skip put of already existing tx entry {}'.format(self.tx_hash))
self.tx_queue.put(tx)
class TokenResolverWorker(threading.Thread):
def __init__(self, wallet_address, ctrl, token_store, in_queue, out_queue):
super(TokenResolverWorker, self).__init__()
self.ctrl = ctrl
self.token_store = token_store
self.in_queue = in_queue
self.out_queue = out_queue
self.seen_tokens = {}
self.wallet_address = strip_0x(wallet_address)
def run(self):
while True:
token_address = self.in_queue.get()
if token_address == None:
logg.debug('token resolver end')
self.out_queue.put_nowait(None)
return
token_address = strip_0x(token_address)
if self.seen_tokens.get(token_address) != None:
continue
logg.debug('resolve token {}'.format(token_address))
self.ctrl.notify('resolve token {}'.format(token_address))
(token_symbol, token_decimals) = self.token_store.by_address(token_address)
self.seen_tokens[token_address] = token_address
self.ctrl.notify('get token balance for {} => {}'.format(token_symbol, self.seen_tokens[token_address]))
balance = token_balance(self.ctrl.chain(), self.ctrl.conn(), self.seen_tokens[token_address], self.wallet_address)
fmt = '{:.' + str(token_decimals) + 'f}'
decimal_balance = fmt.format(balance / (10 ** token_decimals))
logg.debug('token balance for {} ({}) is {}'.format(token_symbol, token_address, decimal_balance))
self.out_queue.put((token_symbol, decimal_balance,))