diff --git a/clicada/cli/user.py b/clicada/cli/user.py index ed7d263..a4e4ce3 100644 --- a/clicada/cli/user.py +++ b/clicada/cli/user.py @@ -62,7 +62,7 @@ def validate(config, args): def execute(ctrl): - tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 10) + tx_getter = TxGetter(ctrl.get('TX_CACHE_URL'), 50) store_path = '.clicada' user_phone_file_label = 'phone' @@ -131,14 +131,17 @@ Tags: {}""".format( token_resolver_queue = Queue() token_result_queue = Queue() tx_queue = Queue() + token_resolver_worker = TokenResolverWorker(user_address, ctrl, token_store, token_resolver_queue, token_result_queue) token_resolver_worker.start() + tx_n = 0 for tx_src in txs['data']: tx_hash = strip_0x(tx_src['tx_hash']) tx_worker = TxResolverWorker(tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_resolver_queue, tx_queue, show_decimals=True, update=ctrl.get('_FORCE')) tx_thread = tx_worker.start() tx_threads.append(tx_worker) + tx_n += 1 # tx = ResolvedTokenTx.from_dict(tx_src) @@ -150,31 +153,44 @@ Tags: {}""".format( # needs to be blocked by wait for all txs + + tx_buf = {} + + for i in range(0, tx_n): + tx = tx_queue.get() + if tx == None: + break + # ugh, ugly + #k = float('{}.{}'.format(tx.block_number, tx.tx_index)) + # tx_index is missing, this is temporary sort measure + k = str(tx.block_number) + '.' + tx.tx_hash + tx_buf[k] = tx + + ctrl.notify('wait for transaction getters to finish work') for tx_thread in tx_threads: tx_thread.join() - tx_queue.put(None) - token_result_queue.put_nowait(None) - - ctrl.write("Balances:") - - while True: - l = tx_queue.get() - if l == None: - token_resolver_queue.put(None) - break - ctrl.write(l) - - token_resolver_worker.join() + token_resolver_queue.put_nowait(None) + token_buf = '' while True: l = token_result_queue.get() - logg.debug('llll {}'.format(l)) if l == None: break - ctrl.write(' {} {}'.format(l[0], l[1])) + #ctrl.write(' {} {}'.format(l[0], l[1])) + token_buf += ' {} {}\n'.format(l[0], l[1]) + ctrl.notify('wait for token resolver to finish work') + token_resolver_worker.join() + ctrl.write("Balances:") + ctrl.write(token_buf) + ctrl.write('') + ks = list(tx_buf.keys()) + ks.sort() + ks.reverse() + for k in ks: + ctrl.write(tx_buf[k]) class TxResolverWorker(threading.Thread): @@ -196,9 +212,12 @@ class TxResolverWorker(threading.Thread): self.ctrl.notify('resolve details for tx {}'.format(self.tx_hash)) tx = ResolvedTokenTx.from_dict(self.tx_src) tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update) - self.token_queue.put(tx.source_token) - self.token_queue.put(tx.destination_token) - self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL')) + self.token_queue.put_nowait(tx.source_token) + self.token_queue.put_nowait(tx.destination_token) + try: + self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL')) + except FileExistsError: + logg.debug('skip put of already existing tx entry {}'.format(self.tx_hash)) self.tx_queue.put(tx) @@ -211,20 +230,18 @@ class TokenResolverWorker(threading.Thread): self.in_queue = in_queue self.out_queue = out_queue self.seen_tokens = {} - self.wg = threading.Lock() self.wallet_address = strip_0x(wallet_address) def run(self): while True: token_address = self.in_queue.get() - self.wg.acquire() if token_address == None: logg.debug('token resolver end') + self.out_queue.put_nowait(None) return token_address = strip_0x(token_address) if self.seen_tokens.get(token_address) != None: - self.wg.release() continue logg.debug('resolve token {}'.format(token_address)) self.ctrl.notify('resolve token {}'.format(token_address)) @@ -236,4 +253,3 @@ class TokenResolverWorker(threading.Thread): decimal_balance = fmt.format(balance / (10 ** token_decimals)) logg.debug('token balance for {} ({}) is {}'.format(token_symbol, token_address, decimal_balance)) self.out_queue.put((token_symbol, decimal_balance,)) - self.wg.release()