diff --git a/apps/cic-eth/cic_eth/api/api_task.py b/apps/cic-eth/cic_eth/api/api_task.py index 61bc9e39..31a18466 100644 --- a/apps/cic-eth/cic_eth/api/api_task.py +++ b/apps/cic-eth/cic_eth/api/api_task.py @@ -17,7 +17,8 @@ from cic_eth.enum import LockEnum app = celery.current_app -logg = logging.getLogger(__name__) +#logg = logging.getLogger(__name__) +logg = logging.getLogger() class Api(ApiBase): @@ -41,39 +42,51 @@ class Api(ApiBase): def tokens(self, token_symbols, proof=None): if isinstance(proof, str): - proof = [[proof]] + proof = [proof] chain_spec_dict = self.chain_spec.asdict() - s_token_resolve = celery.signature( - 'cic_eth.eth.erc20.resolve_tokens_by_symbol', - [ - token_symbols, - chain_spec_dict, - ], - queue=self.queue, - ) - s_token = celery.signature( - 'cic_eth.eth.erc20.token_info', - [ - chain_spec_dict, - proof, - ], - queue=self.queue, - ) + i = 0 + s_group = [] + for token_symbol in token_symbols: + s_token_resolve = celery.signature( + 'cic_eth.eth.erc20.resolve_token_by_symbol', + [ + token_symbol, + chain_spec_dict, + ], + queue=self.queue, + ) -# s_token_verify = celery.signature( -# 'cic_eth.eth.erc20.verify_token_info', -# [ -# chain_spec_dict, -# ], -# queue=self.queue, -# ) - #s_token.link(s_token_verify) - s_token_resolve.link(s_token) - if self.callback_param != None: - s_token.link(self.callback_success) + s_token_info = celery.signature( + 'cic_eth.eth.erc20.token_info', + [ + chain_spec_dict, + proof[i], + ], + queue=self.queue, + ) - return s_token_resolve.apply_async() + s_token_verify = celery.signature( + 'cic_eth.eth.erc20.verify_token_info', + [ + chain_spec_dict, + self.callback_success, + self.callback_error, + ], + queue=self.queue, + ) + i += 1 + + s_token_info.link(s_token_verify) + s_token_resolve.link(s_token_info) + #if self.callback_param != None: + # s_token_verify.link(self.callback_success) + # s_token_verify.on_error(self.callback_error) + + s_group.append(s_token_resolve) + + # return s_token_resolve.apply_async() + return celery.group(s_group)() # def convert_transfer(self, from_address, to_address, target_return, minimum_return, from_token_symbol, to_token_symbol): diff --git a/apps/cic-eth/cic_eth/callbacks/noop.py b/apps/cic-eth/cic_eth/callbacks/noop.py index 2fd579e3..2308734c 100644 --- a/apps/cic-eth/cic_eth/callbacks/noop.py +++ b/apps/cic-eth/cic_eth/callbacks/noop.py @@ -1,7 +1,10 @@ +import logging + import celery celery_app = celery.current_app -logg = celery_app.log.get_default_logger() +#logg = celery_app.log.get_default_logger() +logg = logging.getLogger() @celery_app.task(bind=True) diff --git a/apps/cic-eth/cic_eth/eth/erc20.py b/apps/cic-eth/cic_eth/eth/erc20.py index ff13d283..a43f65cc 100644 --- a/apps/cic-eth/cic_eth/eth/erc20.py +++ b/apps/cic-eth/cic_eth/eth/erc20.py @@ -328,6 +328,26 @@ def approve(self, tokens, holder_address, spender_address, value, chain_spec_dic return tx_hash_hex +@celery_app.task(bind=True, base=CriticalWeb3Task) +def resolve_token_by_symbol(self, token_symbol, chain_spec_dict): + chain_spec = ChainSpec.from_dict(chain_spec_dict) + rpc = RPCConnection.connect(chain_spec, 'default') + registry = CICRegistry(chain_spec, rpc) + + session = self.create_session() + sender_address = AccountRole.get_address('DEFAULT', session) + session.close() + + token_address = registry.by_name(token_symbol, sender_address=sender_address) + logg.debug('token {}'.format(token_address)) + token = { + 'address': token_address, + 'symbol': token_symbol, + 'converters': [], + } + return token + + @celery_app.task(bind=True, base=CriticalWeb3Task) def resolve_tokens_by_symbol(self, token_symbols, chain_spec_dict): """Returns contract addresses of an array of ERC20 token symbols @@ -478,49 +498,52 @@ def cache_approve_data( @celery_app.task(bind=True, base=BaseTask) -def token_info(self, tokens, chain_spec_dict, proofs=[]): +def token_info(self, token, chain_spec_dict, proofs=[]): chain_spec = ChainSpec.from_dict(chain_spec_dict) rpc = RPCConnection.connect(chain_spec, 'default') result_data = [] + token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address'])) + token_chain_object.load(rpc) + token_symbol_proof_hex = to_identifier(token_chain_object.symbol) + token_proofs = [token_symbol_proof_hex] + token_proofs += proofs + + token_data = { + 'decimals': token_chain_object.decimals, + 'name': token_chain_object.name, + 'symbol': token_chain_object.symbol, + 'address': tx_normalize.executable_address(token_chain_object.address), + 'proofs': token_proofs, + } + + return token_data + + +@celery_app.task(bind=True, base=BaseTask) +def verify_token_info(self, token, chain_spec_dict, success_callback, error_callback): queue = self.request.delivery_info.get('routing_key') + s = celery.signature( + 'cic_eth.eth.trust.verify_proofs', + [ + token, + token['address'], + token['proofs'], + chain_spec_dict, + success_callback, + error_callback, + ], + queue=queue, + ) + s.link(success_callback) + s.on_error(error_callback) + s.apply_async() + #s_group.append(s) - i = 0 - s_group = [] - for token in tokens: - token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address'])) - token_chain_object.load(rpc) + #celery.group(s_group)() - token_symbol_proof_hex = to_identifier(token_chain_object.symbol) - token_proofs = [token_symbol_proof_hex] - if len(proofs) > 0: - token_proofs += proofs[i] - - token_data = { - 'decimals': token_chain_object.decimals, - 'name': token_chain_object.name, - 'symbol': token_chain_object.symbol, - 'address': tx_normalize.executable_address(token_chain_object.address), - } - result_data.append(token_data) - - s = celery.signature( - 'cic_eth.eth.trust.verify_proofs', - [ - token, - token_chain_object.address, - token_proofs, - chain_spec_dict, - ], - queue=queue, - ) - s_group.append(s) - i += 1 - - celery.group(s_group)() - - #return result_data + return token @celery_app.task(bind=True, base=BaseTask) diff --git a/apps/cic-eth/cic_eth/eth/trust.py b/apps/cic-eth/cic_eth/eth/trust.py index 96759042..6298b8e6 100644 --- a/apps/cic-eth/cic_eth/eth/trust.py +++ b/apps/cic-eth/cic_eth/eth/trust.py @@ -17,15 +17,25 @@ from cic_eth.error import TrustError celery_app = celery.current_app logg = logging.getLogger() -@celery_app.task(bind=True, base=BaseTask) -def collect(self, collection): - logg.debug('collect {}'.format(collection)) - return collection +@celery_app.task(bind=True, base=BaseTask) +def verify_proof(self, chained_input, proof, subject, chain_spec_dict, success_callback, error_callback): + proof = strip_0x(proof) + + proofs = [] + + logg.debug('proof count {}'.format(len(proofs))) + if len(proofs) == 0: + logg.debug('error {}'.format(len(proofs))) + raise TrustError('foo') + + return (chained_input, (proof, proofs)) @celery_app.task(bind=True, base=BaseTask) -def verify_proof(self, chained_input, proof, subject, chain_spec_dict): +def verify_proofs(self, chained_input, subject, proofs, chain_spec_dict, success_callback, error_callback): + queue = self.request.delivery_info.get('routing_key') + chain_spec = ChainSpec.from_dict(chain_spec_dict) rpc = RPCConnection.connect(chain_spec, 'default') @@ -37,65 +47,31 @@ def verify_proof(self, chained_input, proof, subject, chain_spec_dict): declarator = Declarator(chain_spec) - logg.debug('foo {}'.format(proof)) - proof = strip_0x(proof) - logg.debug('proof is {}'.format(proof)) + have_proofs = {} - proof_count = 0 - for trusted_address in self.trusted_addresses: - o = declarator.declaration(declarator_address, trusted_address, subject, sender_address=sender_address) - r = rpc.do(o) - declarations = declarator.parse_declaration(r) - logg.debug('comparing proof {} with declarations for {} by {}: {}'.format(proof, subject, trusted_address, declarations)) - - for declaration in declarations: - declaration = strip_0x(declaration) - if declaration == proof: - logg.debug('have token proof {} match for trusted address {}'.format(declaration, trusted_address)) - proof_count += 1 - - logg.debug('proof count {}'.format(proof_count)) - if proof_count == 0: - logg.debug('error {}'.format(proof_count)) - raise TrustError('no trusted records found for subject {} proof {}'.format(subject, proof)) - - return chained_input - - -@celery_app.task(bind=True, base=BaseTask) -def verify_proofs(self, chained_input, subject, proofs, chain_spec_dict): - if isinstance(proofs, str): - proofs = [[proofs]] - elif not isinstance(proofs, list): - raise ValueError('proofs argument must be string or list') - - i = 0 for proof in proofs: - if not isinstance(proof, list): - proofs[i] = [proof] - logg.debug('proof entry {} is not a list'.format(i)) - i += 1 - - queue = self.request.delivery_info.get('routing_key') - s_group = [] - for proof in proofs: - logg.debug('proof before {}'.format(proof)) - if isinstance(proof, str): - proof = [proof] - for single_proof in proof: - logg.debug('proof after {}'.format(single_proof)) - s = celery.signature( - 'cic_eth.eth.trust.verify_proof', - [ - chained_input, - single_proof, - subject, - chain_spec_dict, - ], - queue=queue, - ) - #s_group.append(s) - s.apply_async() + proof = strip_0x(proof) - #return chained_input + have_proofs[proof] = [] + + for trusted_address in self.trusted_addresses: + o = declarator.declaration(declarator_address, trusted_address, subject, sender_address=sender_address) + r = rpc.do(o) + declarations = declarator.parse_declaration(r) + logg.debug('comparing proof {} with declarations for {} by {}: {}'.format(proof, subject, trusted_address, declarations)) + + for declaration in declarations: + declaration = strip_0x(declaration) + if declaration == proof: + logg.debug('have token proof {} match for trusted address {}'.format(declaration, trusted_address)) + have_proofs[proof].append(trusted_address) + + out_proofs = {} + for proof in have_proofs.keys(): + if len(have_proofs[proof]) == 0: + logg.error('missing signer for proof {} subject {}'.format(proof, subject)) + raise TrustError((subject, proof,)) + out_proofs[proof] = have_proofs[proof] + + return (chained_input, out_proofs) diff --git a/apps/cic-eth/cic_eth/pytest/fixtures_celery.py b/apps/cic-eth/cic_eth/pytest/fixtures_celery.py index c67014ca..f2287dab 100644 --- a/apps/cic-eth/cic_eth/pytest/fixtures_celery.py +++ b/apps/cic-eth/cic_eth/pytest/fixtures_celery.py @@ -4,13 +4,12 @@ import tempfile import logging import shutil -# local impors +# local imports from cic_eth.task import BaseTask #logg = logging.getLogger(__name__) logg = logging.getLogger() - @pytest.fixture(scope='function') def init_celery_tasks( contract_roles, @@ -42,6 +41,7 @@ def celery_includes(): 'cic_eth.callbacks.noop', 'cic_eth.callbacks.http', 'cic_eth.pytest.mock.filter', + 'cic_eth.pytest.mock.callback', ] diff --git a/apps/cic-eth/cic_eth/pytest/mock/__init__.py b/apps/cic-eth/cic_eth/pytest/mock/__init__.py index 1e78c9be..71866a19 100644 --- a/apps/cic-eth/cic_eth/pytest/mock/__init__.py +++ b/apps/cic-eth/cic_eth/pytest/mock/__init__.py @@ -1 +1,2 @@ from .filter import * +from .callback import * diff --git a/apps/cic-eth/cic_eth/pytest/mock/callback.py b/apps/cic-eth/cic_eth/pytest/mock/callback.py new file mode 100644 index 00000000..8453c30f --- /dev/null +++ b/apps/cic-eth/cic_eth/pytest/mock/callback.py @@ -0,0 +1,28 @@ +# standard imports +import logging + +# external imports +import celery + +#logg = logging.getLogger(__name__) +logg = logging.getLogger() + +celery_app = celery.current_app + +class CallbackTask(celery.Task): + errs = {} + oks = {} + +@celery_app.task(bind=True, base=CallbackTask) +def test_error_callback(self, a, b, c): + o = CallbackTask.oks + s = 'ok' + if c > 0: + o = CallbackTask.errs + s = 'err' + + if o.get(b) == None: + o[b] = [] + o[b].append(a) + + logg.debug('>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> test callback ({}): {} {} {}'.format(s, a, b, c)) diff --git a/apps/cic-eth/tests/task/api/test_app_noncritical.py b/apps/cic-eth/tests/task/api/test_app_noncritical.py index bd8e8a1f..d5007010 100644 --- a/apps/cic-eth/tests/task/api/test_app_noncritical.py +++ b/apps/cic-eth/tests/task/api/test_app_noncritical.py @@ -1,8 +1,11 @@ # standard imports import logging import os +import uuid +import time # external imports +import celery import pytest from hexathon import ( strip_0x, @@ -14,6 +17,7 @@ from cic_eth.api.api_task import Api from cic_eth.task import BaseTask from cic_eth.error import TrustError from cic_eth.encode import tx_normalize +from cic_eth.pytest.mock.callback import CallbackTask logg = logging.getLogger() @@ -48,16 +52,17 @@ def test_tokens( custodial_roles, foo_token_declaration, bar_token_declaration, - celery_session_worker, + celery_worker, ): - api = Api(str(default_chain_spec), queue=None) + api = Api(str(default_chain_spec), queue=None, callback_param='foo') - t = api.token('FOO', proof=foo_token_declaration) + t = api.tokens(['FOO'], proof=[[foo_token_declaration]]) r = t.get() - logg.debug('r {}'.format(r)) + logg.debug('rr {}'.format(r)) assert len(r) == 1 - + assert r[0]['address'] == strip_0x(foo_token) + t = api.tokens(['BAR', 'FOO'], proof=[[bar_token_declaration], [foo_token_declaration]]) r = t.get() logg.debug('results {}'.format(r)) @@ -65,8 +70,21 @@ def test_tokens( assert r[1]['address'] == strip_0x(foo_token) assert r[0]['address'] == strip_0x(bar_token) + celery_app = celery.current_app + + api_param = str(uuid.uuid4()) + api = Api(str(default_chain_spec), queue=None, callback_param=api_param, callback_task='cic_eth.pytest.mock.callback.test_error_callback') bogus_proof = os.urandom(32).hex() - with pytest.raises(TrustError): - t = api.token('FOO', proof=bogus_proof) - r = t.get_leaf() - logg.debug('should raise {}'.format(r)) + t = api.tokens(['FOO'], proof=[[bogus_proof]]) + r = t.get() + logg.debug('r {}'.format(r)) + assert len(CallbackTask.errs[api_param]) == 1 + assert CallbackTask.oks.get(api_param) == None + + api_param = str(uuid.uuid4()) + api = Api(str(default_chain_spec), queue=None, callback_param=api_param, callback_task='cic_eth.pytest.mock.callback.test_error_callback') + t = api.tokens(['BAR'], proof=[[bar_token_declaration]]) + r = t.get() + logg.debug('rr {} {}'.format(r, t.children)) + time.sleep(0.1) + assert len(CallbackTask.oks[api_param]) == 1