feat: Threaded resolution of txs and tokens #12

Open
lash wants to merge 11 commits from lash/thread into master
4 changed files with 95 additions and 42 deletions
Showing only changes of commit cbf058e4e8 - Show all commits

View File

@ -55,6 +55,8 @@ def extra_args():
def apply_args(config, args):
if config.get('META_LOOKUP_METHOD'):
raise NotImplementedError('Sorry, currently only "phone" lookup method is implemented')
if config.true('_FORCE_ALL'):
config.add(True, '_FORCE', exists_ok=True)
def validate(config, args):
@ -122,19 +124,36 @@ Tags: {}""".format(
if ctrl.get('_RAW_TX'):
raw_rpc = ctrl.rpc
tx_store = FileTxStore(store_path, rpc=raw_rpc, notifier=ctrl)
tx_lines = []
seen_tokens = {}
tx_threads = []
token_resolver_queue = Queue()
token_result_queue = Queue()
tx_queue = Queue()
token_resolver_worker = TokenResolverWorker(user_address, ctrl, token_store, token_resolver_queue, token_result_queue)
token_resolver_worker.start()
wallets = []
for tx in txs['data']:
token_resolver_queue.put_nowait(tx['source_token'])
token_resolver_queue.put_nowait(tx['destination_token'])
if tx['sender'] not in wallets:
logg.info('adding wallet {} to metadata lookup'.format(tx['sender']))
wallets.append(tx['sender'])
if tx['recipient'] not in wallets:
wallets.append(tx['recipient'])
logg.info('registered wallet {} for metadata lookup'.format(tx['recipient']))
wallet_threads = []
for a in wallets:
thread_wallet = MetadataResolverWorker(a, ctrl, user_address_store)
thread_wallet.start()
wallet_threads.append(thread_wallet)
ctrl.notify('wait for metadata resolvers to finish work')
for t in wallet_threads:
t.join()
tx_store = FileTxStore(store_path, rpc=raw_rpc, notifier=ctrl)
tx_threads = []
tx_queue = Queue()
tx_n = 0
for tx_src in txs['data']:
tx_hash = strip_0x(tx_src['tx_hash'])
@ -182,6 +201,24 @@ Tags: {}""".format(
ctrl.write(tx_buf[k])
class MetadataResolverWorker(threading.Thread):
def __init__(self, wallet_address, ctrl, user_address_store):
self.user_address_store = user_address_store
self.wallet_address = wallet_address
#tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update)
self.ctrl = ctrl
super(MetadataResolverWorker, self).__init__()
def run(self):
self.ctrl.notify('resolve metadata for {}'.format(self.wallet_address))
try:
self.user_address_store.by_address(self.wallet_address)
except MetadataNotFoundError:
logg.info('failed metadata lookup for {}'.format(self.wallet_address))
class TxResolverWorker(threading.Thread):
def __init__(self, tx_hash, tx_src, ctrl, tx_store, token_store, user_address_store, token_queue, tx_queue, show_decimals=True, update=None):
self.tx_hash = tx_hash
@ -200,13 +237,11 @@ class TxResolverWorker(threading.Thread):
def run(self):
self.ctrl.notify('resolve details for tx {}'.format(self.tx_hash))
tx = ResolvedTokenTx.from_dict(self.tx_src)
tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update)
self.token_queue.put_nowait(tx.source_token)
self.token_queue.put_nowait(tx.destination_token)
try:
self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL'))
except FileExistsError:
logg.debug('skip put of already existing tx entry {}'.format(self.tx_hash))
tx.resolve(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update, lookup=False)
#tx.resolve_tokens(self.token_store, self.user_address_store, show_decimals=self.show_decimals, update=self.update)
#self.token_queue.put_nowait(tx.source_token)
#self.token_queue.put_nowait(tx.destination_token)
self.tx_store.put(self.tx_hash, str(self.tx_src), overwrite=self.ctrl.get('_FORCE_ALL'))
self.tx_queue.put(tx)

View File

@ -52,7 +52,7 @@ class FileTokenStore:
return p
def by_address(self, address):
def by_address(self, address, update=False, lookup=True):
address = tx_normalize.executable_address(address)
token_symbol = self.memstore_symbol.get(address)
@ -65,17 +65,30 @@ class FileTokenStore:
try:
f = open(p, 'r')
except FileNotFoundError:
p = self.__cache_token(address)
f = open(p, 'r')
token_symbol = f.read()
f.close()
pass
p = os.path.join(self.store_path, token_symbol)
f = open(p, 'r')
r = f.read()
f.close()
token_decimals = int(r)
if f == None:
if not lookup:
token_symbol = '???'
token_decimals = '???'
#self.memstore_symbol.put(address, token_symbol)
#self.memstore_decimals.put(address, token_decimals)
#logg.warning('token metadata not found and lookup deactivated. Will use 18 decimals as default')
#return (token_symbol, token_decimals,)
if token_symbol == None:
if f == None:
p = self.__cache_token(address)
f = open(p, 'r')
token_symbol = f.read()
f.close()
p = os.path.join(self.store_path, token_symbol)
f = open(p, 'r')
r = f.read()
f.close()
token_decimals = int(r)
self.memstore_symbol.put(address, token_symbol)
self.memstore_decimals.put(token_symbol, token_decimals)

View File

@ -16,12 +16,16 @@ from clicada.error import (
ExpiredRecordError,
MetadataNotFoundError,
)
from chainlib.eth.address import AddressChecksum
logg = logging.getLogger(__name__)
address_checksummer = AddressChecksum()
class ResolvedTokenTx(TokenTx):
def __init__(self):
super(ResolvedTokenTx, self).__init__()
self.source_token_name = None
@ -33,8 +37,8 @@ class ResolvedTokenTx(TokenTx):
self.recipient_entity = None
def resolve_tokens(self, token_store, show_decimals=False, update=False):
(token_symbol, token_decimals) = token_store.by_address(self.source_token)
def resolve_tokens(self, token_store, show_decimals=False, update=False, lookup=False):
(token_symbol, token_decimals) = token_store.by_address(self.source_token, lookup=False)
self.source_token_decimals = token_decimals
self.source_token_label = token_symbol
token_value = self.to_value / (10 ** token_decimals)
@ -59,31 +63,31 @@ class ResolvedTokenTx(TokenTx):
self.to_value_label = fmt.format(token_value)
def resolve_entity(self, user_store, address):
def resolve_entity(self, user_store, address, update=False, lookup=True):
try:
r = user_store.by_address(address)
r = user_store.by_address(address, update=update, lookup=lookup)
except MetadataNotFoundError:
return address
return address_checksummer.sum(address)
return str(r)
def resolve_sender_entity(self, user_store, update=False):
def resolve_sender_entity(self, user_store, update=False, lookup=True):
if self.tx_type == TokenTxType.faucet_giveto.value:
return 'FAUCET'
return self.resolve_entity(user_store, self.sender)
return self.resolve_entity(user_store, self.sender, update=update, lookup=lookup)
def resolve_recipient_entity(self, user_store, update=False):
return self.resolve_entity(user_store, self.recipient)
def resolve_recipient_entity(self, user_store, update=False, lookup=True):
return self.resolve_entity(user_store, self.recipient, update=update, lookup=lookup)
def resolve_entities(self, user_store, update=False):
self.sender_label = self.resolve_sender_entity(user_store, update=update)
self.recipient_label = self.resolve_recipient_entity(user_store, update=update)
def resolve_entities(self, user_store, update=False, lookup=True):
self.sender_label = self.resolve_sender_entity(user_store, update=update, lookup=lookup)
self.recipient_label = self.resolve_recipient_entity(user_store, update=update, lookup=lookup)
def resolve(self, token_store, user_store, show_decimals=False, update=False):
self.resolve_tokens(token_store, show_decimals, update=update)
def resolve(self, token_store, user_store, show_decimals=False, update=False, lookup=True):
self.resolve_tokens(token_store, show_decimals, update=update, lookup=lookup)
self.resolve_entities(user_store, update=update)

View File

@ -241,13 +241,13 @@ class FileUserStore:
return person_data
def by_address(self, address, update=False):
def by_address(self, address, update=False, lookup=True):
address = tx_normalize.wallet_address(address)
address = strip_0x(address)
#if self.failed_entities.get(address):
if self.is_dud(address):
logg.debug('already tried and failed {}, skipping'.format(address))
return address
raise MetadataNotFoundError()
ignore_expired = self.sticky(address)
@ -281,6 +281,7 @@ class FileUserStore:
person = Account()
person_data = person.deserialize(person_data=data)
logg.debug('wallet {} resolved to {}'.format(address, str(person)))
self.notifier.notify('wallet {} resolved to {}, retrieve extended metadata from metadata service'.format(address, str(person)))
ptr = generate_metadata_pointer(bytes.fromhex(address), MetadataPointer.CUSTOM)
r = None