Revert to multi-token task queue entry point for token api call

This commit is contained in:
nolash 2021-10-09 13:24:57 +02:00
parent 6098374a4e
commit 77fe41da4b
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 73 additions and 91 deletions

View File

@ -45,28 +45,25 @@ class Api(ApiBase):
proof = [proof] proof = [proof]
chain_spec_dict = self.chain_spec.asdict() chain_spec_dict = self.chain_spec.asdict()
i = 0 s_token_resolve = celery.signature(
s_group = [] 'cic_eth.eth.erc20.resolve_tokens_by_symbol',
for token_symbol in token_symbols: [
s_token_resolve = celery.signature( token_symbols,
'cic_eth.eth.erc20.resolve_token_by_symbol', chain_spec_dict,
[ ],
token_symbol, queue=self.queue,
chain_spec_dict, )
],
queue=self.queue,
)
s_token_info = celery.signature( s_token_info = celery.signature(
'cic_eth.eth.erc20.token_info', 'cic_eth.eth.erc20.token_info',
[ [
chain_spec_dict, chain_spec_dict,
proof[i], proof,
], ],
queue=self.queue, queue=self.queue,
) )
s_token_verify = celery.signature( s_token_verify = celery.signature(
'cic_eth.eth.erc20.verify_token_info', 'cic_eth.eth.erc20.verify_token_info',
[ [
chain_spec_dict, chain_spec_dict,
@ -75,18 +72,10 @@ class Api(ApiBase):
], ],
queue=self.queue, queue=self.queue,
) )
i += 1
s_token_info.link(s_token_verify)
s_token_resolve.link(s_token_info)
#if self.callback_param != None:
# s_token_verify.link(self.callback_success)
# s_token_verify.on_error(self.callback_error)
s_group.append(s_token_resolve)
# return s_token_resolve.apply_async() s_token_info.link(s_token_verify)
return celery.group(s_group)() s_token_resolve.link(s_token_info)
return s_token_resolve.apply_async()
# def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol): # def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol):

View File

@ -328,26 +328,6 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic
return tx_hash_hex return tx_hash_hex
@celery_app.task(bind=True, base=CriticalWeb3Task)
def resolve_token_by_symbol(self, token_symbol, chain_spec_dict):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
session = self.create_session()
sender_address = AccountRole.get_address('DEFAULT', session)
session.close()
token_address = registry.by_name(token_symbol, sender_address=sender_address)
logg.debug('token {}'.format(token_address))
token = {
'address': token_address,
'symbol': token_symbol,
'converters': [],
}
return token
@celery_app.task(bind=True, base=CriticalWeb3Task) @celery_app.task(bind=True, base=CriticalWeb3Task)
def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict): def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict):
"""Returns contract addresses of an array of ERC20 token symbols """Returns contract addresses of an array of ERC20 token symbols
@ -498,52 +478,59 @@ def cache_approve_data(
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)
def token_info(self, token, chain_spec_dict, proofs=[]): def token_info(self, tokens, chain_spec_dict, proofs=[]):
chain_spec = ChainSpec.from_dict(chain_spec_dict) chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default') rpc = RPCConnection.connect(chain_spec, 'default')
result_data = [] i = 0
token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address']))
token_chain_object.load(rpc)
token_symbol_proof_hex = to_identifier(token_chain_object.symbol) for token in tokens:
token_proofs = [token_symbol_proof_hex] result_data = []
token_proofs += proofs token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address']))
token_chain_object.load(rpc)
token_data = { token_symbol_proof_hex = to_identifier(token_chain_object.symbol)
'decimals': token_chain_object.decimals, token_proofs = [token_symbol_proof_hex]
'name': token_chain_object.name, token_proofs += proofs[i]
'symbol': token_chain_object.symbol,
'address': tx_normalize.executable_address(token_chain_object.address),
'proofs': token_proofs,
}
return token_data tokens[i] = {
'decimals': token_chain_object.decimals,
'name': token_chain_object.name,
'symbol': token_chain_object.symbol,
'address': tx_normalize.executable_address(token_chain_object.address),
'proofs': token_proofs,
'converters': tokens[i]['converters'],
}
i += 1
return tokens
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)
def verify_token_info(self, token, chain_spec_dict, success_callback, error_callback): def verify_token_info(self, tokens, chain_spec_dict, success_callback, error_callback):
queue = self.request.delivery_info.get('routing_key') queue = self.request.delivery_info.get('routing_key')
s = celery.signature(
'cic_eth.eth.trust.verify_proofs', for token in tokens:
[ s = celery.signature(
token, 'cic_eth.eth.trust.verify_proofs',
token['address'], [
token['proofs'], token,
chain_spec_dict, token['address'],
success_callback, token['proofs'],
error_callback, chain_spec_dict,
], success_callback,
queue=queue, error_callback,
) ],
s.link(success_callback) queue=queue,
s.on_error(error_callback) )
s.apply_async() s.link(success_callback)
s.on_error(error_callback)
s.apply_async()
#s_group.append(s) #s_group.append(s)
#celery.group(s_group)() #celery.group(s_group)()
return token return tokens
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)

View File

@ -1,5 +1,9 @@
# standard imports # standard imports
import logging import logging
import mmap
# standard imports
import tempfile
# external imports # external imports
import celery import celery
@ -10,19 +14,20 @@ logg = logging.getLogger()
celery_app = celery.current_app celery_app = celery.current_app
class CallbackTask(celery.Task): class CallbackTask(celery.Task):
errs = {}
oks = {} mmap_path = tempfile.mkdtemp()
@celery_app.task(bind=True, base=CallbackTask) @celery_app.task(bind=True, base=CallbackTask)
def test_error_callback(self, a, b, c): def test_error_callback(self, a, b, c):
o = CallbackTask.oks
s = 'ok' s = 'ok'
if c > 0: if c > 0:
o = CallbackTask.errs
s = 'err' s = 'err'
if o.get(b) == None: fp = os.path.join(self.mmap_path, b)
o[b] = [] f = open(fp, 'wb')
o[b].append(a) m = mmap.mmap(f.fileno(), access=mmap.ACCESS_WRITE, length=1)
m.write(c.to_bytes(1, 'big'))
m.close()
f.close()
logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test callback ({}): {} {} {}'.format(s, a, b, c)) logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test callback ({}): {} {} {} -> {}'.format(s, a, b, c, fp))

View File

@ -78,6 +78,7 @@ def test_tokens(
t = api.tokens(['FOO'], proof=[[bogus_proof]]) t = api.tokens(['FOO'], proof=[[bogus_proof]])
r = t.get() r = t.get()
logg.debug('r {}'.format(r)) logg.debug('r {}'.format(r))
time.sleep(0.1)
assert len(CallbackTask.errs[api_param]) == 1 assert len(CallbackTask.errs[api_param]) == 1
assert CallbackTask.oks.get(api_param) == None assert CallbackTask.oks.get(api_param) == None