diff --git a/clicada/cli/arg.py b/clicada/cli/arg.py index 2ea2cb2..76cfad4 100644 --- a/clicada/cli/arg.py +++ b/clicada/cli/arg.py @@ -158,6 +158,10 @@ class CmdCtrl: r = self.config.get(k, default) if k in [ '_FORCE', + '_FORCE_ALL', + '_RAW_TX', + '_NO_RESOLVE', + '_NO_TX', ]: if r == None: return False diff --git a/clicada/cli/user.py b/clicada/cli/user.py index 92a0aa0..f74fc34 100644 --- a/clicada/cli/user.py +++ b/clicada/cli/user.py @@ -2,6 +2,7 @@ import sys import logging import datetime +from queue import SimpleQueue as Queue # external imports from cic_eth_registry import CICRegistry @@ -9,17 +10,26 @@ from cic_eth_registry.lookup.tokenindex import TokenIndexLookup from cic_types.models.person import Person from chainlib.eth.address import to_checksum_address from chainlib.encode import TxHexNormalizer -from hexathon import add_0x +from hexathon import ( + add_0x, + strip_0x, + ) # local imports -from clicada.tx import TxGetter -from clicada.user import FileUserStore -from clicada.token import ( - FileTokenStore, - token_balance, +from clicada.tx import ( + TxGetter, + FormattedTokenTx, ) +from clicada.user import FileUserStore +from clicada.token import FileTokenStore from clicada.tx import ResolvedTokenTx +from clicada.tx.file import FileTxStore from clicada.error import MetadataNotFoundError +from clicada.cli.worker import ( + MetadataResolverWorker, + TxResolverWorker, + TokenResolverWorker, + ) logg = logging.getLogger(__name__) @@ -30,21 +40,31 @@ def process_args(argparser): argparser.add_argument('-m', '--method', type=str, help='lookup method') argparser.add_argument('--meta-url', dest='meta_url', type=str, help='Url to retrieve metadata from') argparser.add_argument('-f', '--force-update', dest='force_update', action='store_true', help='Update records of mutable entries') + argparser.add_argument('-ff', '--force-update-all', dest='force_update_all', action='store_true', help='Update records of mutable entries and immutable entries') + argparser.add_argument('-N', '--no-resolve', dest='no_resolve', action='store_true', help='Resolve no metadata') + argparser.add_argument('--no-tx', dest='no_tx', action='store_true', help='Do not fetch transactions') + argparser.add_argument('--raw-tx', dest='raw_tx', action='store_true', help='Also cache raw transaction data') argparser.add_argument('identifier', type=str, help='user identifier') def extra_args(): return { + 'raw_tx': '_RAW_TX', 'force_update': '_FORCE', + 'force_update_all': '_FORCE_ALL', 'method': 'META_LOOKUP_METHOD', 'meta_url': 'META_URL', 'identifier': '_IDENTIFIER', + 'no_resolve': '_NO_RESOLVE', + 'no_tx': '_NO_TX', } def apply_args(config, args): if config.get('META_LOOKUP_METHOD'): raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented') + if config.true('_FORCE_ALL'): + config.add(True, '_FORCE', exists_ok=True) def validate(config, args): @@ -52,11 +72,11 @@ def validate(config, args): def execute(ctrl): - tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 10) + tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 50) store_path = '.clicada' user_phone_file_label = 'phone' - user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) + user_phone_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_phone_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter, notifier=ctrl) ctrl.notify('resolving identifier {} to wallet address'.format(ctrl.get('_IDENTIFIER'))) user_address = user_phone_store.by_phone(ctrl.get('_IDENTIFIER'), update=ctrl.get('_FORCE')) @@ -72,61 +92,137 @@ def execute(ctrl): logg.debug('loaded user address {} for {}'.format(user_address, ctrl.get('_IDENTIFIER'))) user_address_normal = tx_normalizer.wallet_address(user_address) + ctrl.write("""Results for lookup by phone {}: + +Metadata: + Network address: {}""".format( + ctrl.get('_IDENTIFIER'), + add_0x(user_address), + ) + ) + + if not ctrl.get('_NO_RESOLVE'): + + token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path) + + user_address_file_label = 'address' + user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter, notifier=ctrl) + + ctrl.notify('resolving metadata for address {}'.format(user_address_normal)) + try: + r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE')) + except MetadataNotFoundError as e: + ctrl.ouch('could not resolve metadata for user: {}'.format(e)) + sys.exit(1) + + ctrl.write(""" Chain: {} + Name: {} + Registered: {} + Gender: {} + Location: {} + Products: {} + Tags: {}""".format( + ctrl.chain().common_name(), + str(r), + datetime.datetime.fromtimestamp(r.date_registered).ctime(), + r.gender, + r.location['area_name'], + ','.join(r.products), + ','.join(r.tags), + ) + ) + + if ctrl.get('_NO_TX'): + sys.exit(0) + + raw_rpc = None + if ctrl.get('_RAW_TX'): + raw_rpc = ctrl.rpc + ctrl.notify('retrieving txs for address {}'.format(user_address_normal)) txs = tx_getter.get(user_address) - token_store = FileTokenStore(ctrl.chain(), ctrl.conn(), 'token', store_path) + if ctrl.get('_NO_RESOLVE'): + for v in txs['data']: + tx = FormattedTokenTx.from_dict(v) + ctrl.write(tx) + sys.exit(0) - user_address_file_label = 'address' - user_address_store = FileUserStore(ctrl.opener('meta'), ctrl.chain(), user_address_file_label, store_path, int(ctrl.get('FILESTORE_TTL')), encrypter=ctrl.encrypter) - ctrl.notify('resolving metadata for address {}'.format(user_address_normal)) - try: - r = user_address_store.by_address(user_address_normal, update=ctrl.get('_FORCE')) - except MetadataNotFoundError as e: - ctrl.ouch('could not resolve metadata for user: {}'.format(e)) - sys.exit(1) + token_resolver_queue = Queue() + token_result_queue = Queue() + token_resolver_worker = TokenResolverWorker(user_address, ctrl, token_store, token_resolver_queue, token_result_queue) + token_resolver_worker.start() - ctrl.write("""Phone: {} -Network address: {} -Chain: {} -Name: {} -Registered: {} -Gender: {} -Location: {} -Products: {} -Tags: {}""".format( - ctrl.get('_IDENTIFIER'), - add_0x(user_address), - ctrl.chain().common_name(), - str(r), - datetime.datetime.fromtimestamp(r.date_registered).ctime(), - r.gender, - r.location['area_name'], - ','.join(r.products), - ','.join(r.tags), - ) -) + wallets = [] + for tx in txs['data']: + token_resolver_queue.put_nowait(tx['source_token']) + token_resolver_queue.put_nowait(tx['destination_token']) + if tx['sender'] not in wallets: + logg.info('adding wallet {} to metadata lookup'.format(tx['sender'])) + wallets.append(tx['sender']) + if tx['recipient'] not in wallets: + wallets.append(tx['recipient']) + logg.info('registered wallet {} for metadata lookup'.format(tx['recipient'])) - tx_lines = [] - seen_tokens = {} + wallet_threads = [] + for a in wallets: + thread_wallet = MetadataResolverWorker(a, ctrl, user_address_store) + thread_wallet.start() + wallet_threads.append(thread_wallet) + + ctrl.notify('wait for metadata resolvers to finish work') + for t in wallet_threads: + t.join() + + tx_store = FileTxStore(store_path, rpc=raw_rpc, notifier=ctrl) + tx_threads = [] + tx_queue = Queue() + + tx_n = 0 for tx_src in txs['data']: - ctrl.notify('resolve details for tx {}'.format(tx_src['tx_hash'])) - tx = ResolvedTokenTx.from_dict(tx_src) - tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE')) - tx_lines.append(tx) - seen_tokens[tx.source_token_label] = tx.source_token - seen_tokens[tx.destination_token_label] = tx.destination_token + tx_hash = strip_0x(tx_src['tx_hash']) + tx_worker = TxResolverWorker(tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_resolver_queue, tx_queue, show_decimals=True, update=ctrl.get('_FORCE')) + tx_thread = tx_worker.start() + tx_threads.append(tx_worker) + tx_n += 1 + + tx_buf = {} + + for i in range(0, tx_n): + tx = tx_queue.get() + if tx == None: + break + # ugh, ugly + #k = float('{}.{}'.format(tx.block_number, tx.tx_index)) + # tx_index is missing, this is temporary sort measure + k = str(tx.block_number) + '.' + tx.tx_hash + tx_buf[k] = tx + + ctrl.notify('wait for transaction getters to finish work') + for tx_thread in tx_threads: + tx_thread.join() + + token_resolver_queue.put_nowait(None) + + token_buf = '' + while True: + l = token_result_queue.get() + if l == None: + break + token_buf += ' {} {}\n'.format(l[0], l[1]) + + ctrl.notify('wait for token resolver to finish work') + token_resolver_worker.join() + + ctrl.write('') + ctrl.write("Balances:") + ctrl.write(token_buf) + ks = list(tx_buf.keys()) + ks.sort() + ks.reverse() + for k in ks: + ctrl.write(tx_buf[k]) + - for k in seen_tokens.keys(): - ctrl.notify('resolve token {}'.format(seen_tokens[k])) - (token_symbol, token_decimals) = token_store.by_address(seen_tokens[k]) - ctrl.notify('get token balance for {} => {}'.format(token_symbol, seen_tokens[k])) - balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address) - fmt = '{:.' + str(token_decimals) + 'f}' - decimal_balance = fmt.format(balance / (10 ** token_decimals)) - ctrl.write("Balances:\n {} {}".format(token_symbol, decimal_balance)) - print() - for l in tx_lines: - ctrl.write(l) diff --git a/clicada/cli/worker.py b/clicada/cli/worker.py new file mode 100644 index 0000000..74a3375 --- /dev/null +++ b/clicada/cli/worker.py @@ -0,0 +1,87 @@ +# standard imports +import threading +import logging + +# external imports +from hexathon import strip_0x + +# local imports +from clicada.tx import ResolvedTokenTx +from clicada.token import token_balance +from clicada.error import MetadataNotFoundError + +logg = logging.getLogger(__name__) + + +class MetadataResolverWorker(threading.Thread): + + def __init__(self, wallet_address, ctrl, user_address_store): + self.user_address_store = user_address_store + self.wallet_address = wallet_address + self.ctrl = ctrl + super(MetadataResolverWorker, self).__init__() + + + def run(self): + self.ctrl.notify('resolve metadata for {}'.format(self.wallet_address)) + try: + self.user_address_store.by_address(self.wallet_address) + except MetadataNotFoundError: + logg.info('failed metadata lookup for {}'.format(self.wallet_address)) + + +class TxResolverWorker(threading.Thread): + def __init__(self, tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_queue, tx_queue, show_decimals=True, update=None): + self.tx_hash = tx_hash + self.tx_src = tx_src + self.ctrl = ctrl + self.token_store = token_store + self.user_address_store = user_address_store + self.show_decimals = show_decimals + self.update = update + self.token_queue = token_queue + self.tx_store = tx_store + self.tx_queue = tx_queue + super(TxResolverWorker, self).__init__() + + + def run(self): + self.ctrl.notify('resolve details for tx {}'.format(self.tx_hash)) + tx = ResolvedTokenTx.from_dict(self.tx_src) + tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update, lookup=False) + self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL')) + self.tx_queue.put(tx) + + +class TokenResolverWorker(threading.Thread): + + def __init__(self, wallet_address, ctrl, token_store, in_queue, out_queue): + super(TokenResolverWorker, self).__init__() + self.ctrl = ctrl + self.token_store = token_store + self.in_queue = in_queue + self.out_queue = out_queue + self.seen_tokens = {} + self.wallet_address = strip_0x(wallet_address) + + + def run(self): + while True: + token_address = self.in_queue.get() + if token_address == None: + logg.debug('token resolver end') + self.out_queue.put_nowait(None) + return + token_address = strip_0x(token_address) + if self.seen_tokens.get(token_address) != None: + continue + logg.debug('resolve token {}'.format(token_address)) + self.ctrl.notify('resolve token {}'.format(token_address)) + (token_symbol, token_decimals) = self.token_store.by_address(token_address) + self.seen_tokens[token_address] = token_address + self.ctrl.notify('get token balance for {} => {}'.format(token_symbol, self.seen_tokens[token_address])) + balance = token_balance(self.ctrl.chain(), self.ctrl.conn(), self.seen_tokens[token_address], self.wallet_address) + fmt = '{:.' + str(token_decimals) + 'f}' + decimal_balance = fmt.format(balance / (10 ** token_decimals)) + logg.debug('token balance for {} ({}) is {}'.format(token_symbol, token_address, decimal_balance)) + self.out_queue.put((token_symbol, decimal_balance,)) diff --git a/clicada/token/token.py b/clicada/token/token.py index 3e44c9a..ef496d2 100644 --- a/clicada/token/token.py +++ b/clicada/token/token.py @@ -52,7 +52,7 @@ class FileTokenStore: return p - def by_address(self, address): + def by_address(self, address, update=False, lookup=True): address = tx_normalize.executable_address(address) token_symbol = self.memstore_symbol.get(address) @@ -65,17 +65,30 @@ class FileTokenStore: try: f = open(p, 'r') except FileNotFoundError: - p = self.__cache_token(address) - f = open(p, 'r') - - token_symbol = f.read() - f.close() + pass - p = os.path.join(self.store_path, token_symbol) - f = open(p, 'r') - r = f.read() - f.close() - token_decimals = int(r) + if f == None: + if not lookup: + token_symbol = '???' + token_decimals = '???' + #self.memstore_symbol.put(address, token_symbol) + #self.memstore_decimals.put(address, token_decimals) + #logg.warning('token metadata not found and lookup deactivated. Will use 18 decimals as default') + #return (token_symbol, token_decimals,) + + if token_symbol == None: + if f == None: + p = self.__cache_token(address) + f = open(p, 'r') + + token_symbol = f.read() + f.close() + + p = os.path.join(self.store_path, token_symbol) + f = open(p, 'r') + r = f.read() + f.close() + token_decimals = int(r) self.memstore_symbol.put(address, token_symbol) self.memstore_decimals.put(token_symbol, token_decimals) diff --git a/clicada/tx/file.py b/clicada/tx/file.py new file mode 100644 index 0000000..b795883 --- /dev/null +++ b/clicada/tx/file.py @@ -0,0 +1,45 @@ +# standard imports +import os +import logging + +# external imports +from chainlib.eth.tx import transaction +from leveldir.numeric import NumDir +from leveldir.hex import HexDir +from hexathon import strip_0x + +logg = logging.getLogger(__name__) + + +class FileTxStore: + + subdivision = 100000 + + def __init__(self, store_base_path, rpc=None, notifier=None): + tx_base_path = os.path.join(store_base_path, 'tx') + num_base_path = os.path.join(tx_base_path, 'blocks') + hash_base_path = os.path.join(tx_base_path, 'hash') + raw_base_path = os.path.join(tx_base_path, 'raw') + self.block_index_dir = NumDir(num_base_path) + self.hash_index_dir = HexDir(hash_base_path, 32) + self.raw_index_dir = HexDir(raw_base_path, 32) + self.rpc = rpc + self.notifier = notifier + + + def put(self, k, v, overwrite=False): + if self.notifier != None: + self.notifier.notify('caching tx data for {}'.format(k)) + hsh = bytes.fromhex(k) + if not overwrite and self.hash_index_dir.have(k): + logg.debug('tx store already has {}'.format(k)) + return + + self.hash_index_dir.add(hsh, v.encode('utf-8')) + + if self.rpc != None: + self.notifier.notify('retrieve and cache raw tx data for {}'.format(k)) + o = transaction(k) + r = self.rpc.conn.do(o) + raw = bytes.fromhex(strip_0x(r['raw'])) + self.raw_index_dir.add(hsh, raw) diff --git a/clicada/tx/tx.py b/clicada/tx/tx.py index 2cbea44..7f6061d 100644 --- a/clicada/tx/tx.py +++ b/clicada/tx/tx.py @@ -16,11 +16,32 @@ from clicada.error import ( ExpiredRecordError, MetadataNotFoundError, ) +from chainlib.eth.address import AddressChecksum logg = logging.getLogger(__name__) +address_checksummer = AddressChecksum() + + +class FormattedTokenTx(TokenTx): + + def __init__(self): + super(FormattedTokenTx, self).__init__() + self.symmetric = True + + def __str__(self): + if self.symmetric: + return '{} {} => {} {} {}'.format( + self.date_block_label, + self.sender_label, + self.recipient_label, + self.destination_token_label, + self.to_value_label, + ) + + +class ResolvedTokenTx(FormattedTokenTx): -class ResolvedTokenTx(TokenTx): def __init__(self): super(ResolvedTokenTx, self).__init__() @@ -28,13 +49,12 @@ class ResolvedTokenTx(TokenTx): self.destination_token_name = None self.source_token_decimals = None self.destination_token_decimals = None - self.symmetric = True self.sender_entity = None self.recipient_entity = None - def resolve_tokens(self, token_store, show_decimals=False, update=False): - (token_symbol, token_decimals) = token_store.by_address(self.source_token) + def resolve_tokens(self, token_store, show_decimals=False, update=False, lookup=False): + (token_symbol, token_decimals) = token_store.by_address(self.source_token, lookup=False) self.source_token_decimals = token_decimals self.source_token_label = token_symbol token_value = self.to_value / (10 ** token_decimals) @@ -59,45 +79,34 @@ class ResolvedTokenTx(TokenTx): self.to_value_label = fmt.format(token_value) - def resolve_entity(self, user_store, address): + def resolve_entity(self, user_store, address, update=False, lookup=True): try: - r = user_store.by_address(address) + r = user_store.by_address(address, update=update, lookup=lookup) except MetadataNotFoundError: - return address + return address_checksummer.sum(address) return str(r) - def resolve_sender_entity(self, user_store, update=False): + def resolve_sender_entity(self, user_store, update=False, lookup=True): if self.tx_type == TokenTxType.faucet_giveto.value: return 'FAUCET' - return self.resolve_entity(user_store, self.sender) + return self.resolve_entity(user_store, self.sender, update=update, lookup=lookup) - def resolve_recipient_entity(self, user_store, update=False): - return self.resolve_entity(user_store, self.recipient) + def resolve_recipient_entity(self, user_store, update=False, lookup=True): + return self.resolve_entity(user_store, self.recipient, update=update, lookup=lookup) - def resolve_entities(self, user_store, update=False): - self.sender_label = self.resolve_sender_entity(user_store, update=update) - self.recipient_label = self.resolve_recipient_entity(user_store, update=update) + def resolve_entities(self, user_store, update=False, lookup=True): + self.sender_label = self.resolve_sender_entity(user_store, update=update, lookup=lookup) + self.recipient_label = self.resolve_recipient_entity(user_store, update=update, lookup=lookup) - def resolve(self, token_store, user_store, show_decimals=False, update=False): - self.resolve_tokens(token_store, show_decimals, update=update) + def resolve(self, token_store, user_store, show_decimals=False, update=False, lookup=True): + self.resolve_tokens(token_store, show_decimals, update=update, lookup=lookup) self.resolve_entities(user_store, update=update) - def __str__(self): - if self.symmetric: - return '{} {} => {} {} {}'.format( - self.date_block_label, - self.sender_label, - self.recipient_label, - self.destination_token_label, - self.to_value_label, - ) - - class TxGetter: def __init__(self, cache_url, limit=0): diff --git a/clicada/user/file.py b/clicada/user/file.py index 02bf037..e48a341 100644 --- a/clicada/user/file.py +++ b/clicada/user/file.py @@ -65,7 +65,7 @@ class Account(Person): class FileUserStore: - def __init__(self, metadata_opener, chain_spec, label, store_base_path, ttl, encrypter=None): + def __init__(self, metadata_opener, chain_spec, label, store_base_path, ttl, encrypter=None, notifier=None): invalidate_before = datetime.datetime.now() - datetime.timedelta(seconds=ttl) self.invalidate_before = int(invalidate_before.timestamp()) self.have_xattr = False @@ -83,6 +83,7 @@ class FileUserStore: self.metadata_opener = metadata_opener self.failed_entities = {} self.encrypter = encrypter + self.notifier = notifier def __validate_dir(self): @@ -191,7 +192,6 @@ class FileUserStore: if self.encrypter != None: v = self.encrypter.decrypt(k, v) - logg.debug('>>>>>>>>>>>>< v decoded {}'.format(v)) v = v.decode('utf-8') logg.debug('retrieved {} from {}'.format(k, p)) @@ -215,6 +215,8 @@ class FileUserStore: logg.info(e) pass + self.notifier.notify('wallet address for phone {} not found locally, retrieve from metadata service'.format(phone)) + getter = self.metadata_opener ptr = generate_metadata_pointer(phone.encode('utf-8'), MetadataPointer.PHONE) r = None @@ -239,13 +241,12 @@ class FileUserStore: return person_data - def by_address(self, address, update=False): + def by_address(self, address, update=False, lookup=True): address = tx_normalize.wallet_address(address) address = strip_0x(address) - #if self.failed_entities.get(address): if self.is_dud(address): logg.debug('already tried and failed {}, skipping'.format(address)) - return address + raise MetadataNotFoundError() ignore_expired = self.sticky(address) @@ -260,6 +261,8 @@ class FileUserStore: logg.info(e) pass + self.notifier.notify('metadata for wallet {} not found locally, retrieve from metadata service'.format(address)) + getter = self.metadata_opener ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.PERSON) @@ -277,6 +280,8 @@ class FileUserStore: person = Account() person_data = person.deserialize(person_data=data) + logg.debug('wallet {} resolved to {}'.format(address, str(person))) + self.notifier.notify('wallet {} resolved to {}, retrieve extended metadata from metadata service'.format(address, str(person))) ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.CUSTOM) r = None try: diff --git a/requirements.txt b/requirements.txt index 2003eba..04f2aee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ hexathon~=0.1.0 pycryptodome~=3.10.1 chainlib-eth~=0.0.21 chainlib~=0.0.17 +leveldir~=0.1.0 diff --git a/setup.cfg b/setup.cfg index 17328ab..6af58cc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = clicada -version = 0.0.6a2 +version = 0.0.7rc1 description = CLI CRM tool for the cic-stack custodial wallet system author = Louis Holbrook author_email = dev@holbrook.no