Move token tasks to cic_eth.eth.erc20

This commit is contained in:
nolash 2021-10-08 19:21:47 +02:00
parent 6ccffb15b6
commit df21db958b
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
3 changed files with 65 additions and 82 deletions

View File

@ -1,81 +1,2 @@
# standard imports
import logging
# external imports
import celery
from chainlib.connection import RPCConnection
from chainlib.chain import ChainSpec
from cic_eth_registry.erc20 import ERC20Token
from hexathon import add_0x
from eth_address_declarator import Declarator
from cic_eth_registry import CICRegistry
from okota.token_index import to_identifier
# local imports
from cic_eth.task import (
BaseTask,
)
from cic_eth.db.models.role import AccountRole
from cic_eth.error import TrustError
celery_app = celery.current_app
logg = logging.getLogger()
@celery_app.task(bind=True, base=BaseTask)
def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}
@celery_app.task(bind=True, base=BaseTask)
def token(self, tokens, chain_spec_dict):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
declarator = Declarator(chain_spec)
session = self.create_session()
sender_address = AccountRole.get_address('DEFAULT', session)
sender_address = AccountRole.get_address('DEFAULT', session)
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
have_proof = False
result_data = []
for token in tokens:
token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address']))
token_chain_object.load(rpc)
token_data = {
'decimals': token_chain_object.decimals,
'name': token_chain_object.name,
'symbol': token_chain_object.symbol,
'address': token_chain_object.address,
'declaration': {},
}
token_proof_hex = to_identifier(token_chain_object.symbol)
logg.debug('token proof to match is {}'.format(token_proof_hex))
for trusted_address in self.trusted_addresses:
o = declarator.declaration(declarator_address, trusted_address, token_chain_object.address, sender_address=sender_address)
r = rpc.do(o)
declarations = declarator.parse_declaration(r)
token_data['declaration'][trusted_address] = declarations
logg.debug('declarations for {} by {}: {}'.format(token_chain_object.address, trusted_address, declarations))
for declaration in declarations:
if declaration == token_proof_hex:
logg.debug('have token proof {} match for trusted address {}'.format(declaration, trusted_address))
have_proof = True
if not have_proof:
raise TrustError('no proof found for token {}'.format(token_chain_object.symbol))
result_data.append(token_data)
return result_data
from cic_eth.eth.erc20 import default_token

View File

@ -25,7 +25,7 @@ class Api(ApiBase):
def default_token(self):
s_token = celery.signature(
'cic_eth.admin.token.default_token',
'cic_eth.eth.erc20.default_token',
[],
queue=self.queue,
)
@ -51,7 +51,7 @@ class Api(ApiBase):
)
s_token = celery.signature(
'cic_eth.admin.token.token',
'cic_eth.eth.erc20.token_info',
[
chain_spec_dict,
],

View File

@ -19,6 +19,8 @@ from hexathon import (
from chainqueue.error import NotLocalTxError
from eth_erc20 import ERC20
from chainqueue.sql.tx import cache_tx_dict
from okota.token_index import to_identifier
from eth_address_declarator import Declarator
# local imports
from cic_eth.db.models.base import SessionBase
@ -27,6 +29,7 @@ from cic_eth.error import (
TokenCountError,
PermanentTxError,
OutOfGasError,
TrustError,
YouAreBrokeError,
)
from cic_eth.queue.tx import register_tx
@ -39,6 +42,7 @@ from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
CriticalSQLAlchemyAndSignerTask,
BaseTask,
)
from cic_eth.eth.nonce import CustodialTaskNonceOracle
from cic_eth.encode import tx_normalize
@ -473,3 +477,61 @@ def cache_approve_data(
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(bind=True, base=BaseTask)
def token_info(self, tokens, chain_spec_dict):
chain_spec = ChainSpec.from_dict(chain_spec_dict)
rpc = RPCConnection.connect(chain_spec, 'default')
declarator = Declarator(chain_spec)
session = self.create_session()
sender_address = AccountRole.get_address('DEFAULT', session)
sender_address = AccountRole.get_address('DEFAULT', session)
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
have_proof = False
result_data = []
for token in tokens:
token_chain_object = ERC20Token(chain_spec, rpc, add_0x(token['address']))
token_chain_object.load(rpc)
token_data = {
'decimals': token_chain_object.decimals,
'name': token_chain_object.name,
'symbol': token_chain_object.symbol,
'address': token_chain_object.address,
'declaration': {},
}
token_proof_hex = to_identifier(token_chain_object.symbol)
logg.debug('token proof to match is {}'.format(token_proof_hex))
for trusted_address in self.trusted_addresses:
o = declarator.declaration(declarator_address, trusted_address, token_chain_object.address, sender_address=sender_address)
r = rpc.do(o)
declarations = declarator.parse_declaration(r)
token_data['declaration'][trusted_address] = declarations
logg.debug('declarations for {} by {}: {}'.format(token_chain_object.address, trusted_address, declarations))
for declaration in declarations:
if declaration == token_proof_hex:
logg.debug('have token proof {} match for trusted address {}'.format(declaration, trusted_address))
have_proof = True
if not have_proof:
raise TrustError('no proof found for token {}'.format(token_chain_object.symbol))
result_data.append(token_data)
return result_data
@celery_app.task(bind=True, base=BaseTask)
def default_token(self):
return {
'symbol': self.default_token_symbol,
'address': self.default_token_address,
'name': self.default_token_name,
'decimals': self.default_token_decimals,
}