From 616fa2e15c5badefffa94a55d579be2c73a19117 Mon Sep 17 00:00:00 2001 From: lash Date: Sat, 22 Jan 2022 14:36:58 +0000 Subject: [PATCH] WIP Add threaded lookups (currently notifier gets confused) --- clicada/cli/user.py | 125 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 108 insertions(+), 17 deletions(-) diff --git a/clicada/cli/user.py b/clicada/cli/user.py index 41f2533..ed7d263 100644 --- a/clicada/cli/user.py +++ b/clicada/cli/user.py @@ -2,6 +2,8 @@ import sys import logging import datetime +import threading +from queue import SimpleQueue as Queue # external imports from cic_eth_registry import CICRegistry @@ -123,26 +125,115 @@ Tags: {}""".format( tx_store = FileTxStore(store_path, rpc=raw_rpc, notifier=ctrl) tx_lines = [] seen_tokens = {} + + tx_threads = [] + + token_resolver_queue = Queue() + token_result_queue = Queue() + tx_queue = Queue() + token_resolver_worker = TokenResolverWorker(user_address, ctrl, token_store, token_resolver_queue, token_result_queue) + token_resolver_worker.start() + for tx_src in txs['data']: tx_hash = strip_0x(tx_src['tx_hash']) - ctrl.notify('resolve details for tx {}'.format(tx_hash)) - tx = ResolvedTokenTx.from_dict(tx_src) - tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE')) - tx_lines.append(tx) - seen_tokens[tx.source_token_label] = tx.source_token - seen_tokens[tx.destination_token_label] = tx.destination_token - tx_store.put(tx_hash, str(tx_src), overwrite=ctrl.get('_FORCE_ALL')) + tx_worker = TxResolverWorker(tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_resolver_queue, tx_queue, show_decimals=True, update=ctrl.get('_FORCE')) + tx_thread = tx_worker.start() + tx_threads.append(tx_worker) + + +# tx = ResolvedTokenTx.from_dict(tx_src) +# tx.resolve(token_store, user_address_store, show_decimals=True, update=ctrl.get('_FORCE')) +# tx_lines.append(tx) +# seen_tokens[tx.source_token_label] = tx.source_token +# seen_tokens[tx.destination_token_label] = tx.destination_token +# tx_store.put(tx_hash, str(tx_src), overwrite=ctrl.get('_FORCE_ALL')) + + # needs to be blocked by wait for all txs + + for tx_thread in tx_threads: + tx_thread.join() + + tx_queue.put(None) + token_result_queue.put_nowait(None) ctrl.write("Balances:") - for k in seen_tokens.keys(): - ctrl.notify('resolve token {}'.format(seen_tokens[k])) - (token_symbol, token_decimals) = token_store.by_address(seen_tokens[k]) - ctrl.notify('get token balance for {} => {}'.format(token_symbol, seen_tokens[k])) - balance = token_balance(ctrl.chain(), ctrl.conn(), seen_tokens[k], user_address) - fmt = '{:.' + str(token_decimals) + 'f}' - decimal_balance = fmt.format(balance / (10 ** token_decimals)) - ctrl.write(" {} {}".format(token_symbol, decimal_balance)) - print() - for l in tx_lines: + while True: + l = tx_queue.get() + if l == None: + token_resolver_queue.put(None) + break ctrl.write(l) + + token_resolver_worker.join() + + while True: + l = token_result_queue.get() + logg.debug('llll {}'.format(l)) + if l == None: + break + ctrl.write(' {} {}'.format(l[0], l[1])) + + + + +class TxResolverWorker(threading.Thread): + def __init__(self, tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_queue, tx_queue, show_decimals=True, update=None): + self.tx_hash = tx_hash + self.tx_src = tx_src + self.ctrl = ctrl + self.token_store = token_store + self.user_address_store = user_address_store + self.show_decimals = show_decimals + self.update = update + self.token_queue = token_queue + self.tx_store = tx_store + self.tx_queue = tx_queue + super(TxResolverWorker, self).__init__() + + + def run(self): + self.ctrl.notify('resolve details for tx {}'.format(self.tx_hash)) + tx = ResolvedTokenTx.from_dict(self.tx_src) + tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update) + self.token_queue.put(tx.source_token) + self.token_queue.put(tx.destination_token) + self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL')) + self.tx_queue.put(tx) + + +class TokenResolverWorker(threading.Thread): + + def __init__(self, wallet_address, ctrl, token_store, in_queue, out_queue): + super(TokenResolverWorker, self).__init__() + self.ctrl = ctrl + self.token_store = token_store + self.in_queue = in_queue + self.out_queue = out_queue + self.seen_tokens = {} + self.wg = threading.Lock() + self.wallet_address = strip_0x(wallet_address) + + + def run(self): + while True: + token_address = self.in_queue.get() + self.wg.acquire() + if token_address == None: + logg.debug('token resolver end') + return + token_address = strip_0x(token_address) + if self.seen_tokens.get(token_address) != None: + self.wg.release() + continue + logg.debug('resolve token {}'.format(token_address)) + self.ctrl.notify('resolve token {}'.format(token_address)) + (token_symbol, token_decimals) = self.token_store.by_address(token_address) + self.seen_tokens[token_address] = token_address + self.ctrl.notify('get token balance for {} => {}'.format(token_symbol, self.seen_tokens[token_address])) + balance = token_balance(self.ctrl.chain(), self.ctrl.conn(), self.seen_tokens[token_address], self.wallet_address) + fmt = '{:.' + str(token_decimals) + 'f}' + decimal_balance = fmt.format(balance / (10 ** token_decimals)) + logg.debug('token balance for {} ({}) is {}'.format(token_symbol, token_address, decimal_balance)) + self.out_queue.put((token_symbol, decimal_balance,)) + self.wg.release()