cic-internal-integration/apps/cic-eth/cic_eth/ext/address.py

58 lines
1.8 KiB
Python
Raw Normal View History

2021-02-17 09:19:42 +01:00
# standard imports
import logging
# third-party imports
import celery
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.constant import ZERO_ADDRESS
from cic_eth_registry import CICRegistry
2021-05-01 01:53:02 +02:00
from eth_address_declarator import Declarator
# local imports
from cic_eth.task import BaseTask
2021-02-17 09:19:42 +01:00
celery_app = celery.current_app
logg = logging.getLogger()
def translate_address(address, trusted_addresses, chain_spec, sender_address=ZERO_ADDRESS):
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
declarator_address = registry.by_name('AddressDeclarator', sender_address=sender_address)
2021-05-01 01:53:02 +02:00
c = Declarator(chain_spec)
2021-02-17 09:19:42 +01:00
for trusted_address in trusted_addresses:
o = c.declaration(declarator_address, trusted_address, address, sender_address=sender_address)
r = rpc.do(o)
2021-05-01 01:53:02 +02:00
declaration_hex = Declarator.parse_declaration(r)
declaration_hex = declaration_hex[0].rstrip('0')
declaration_bytes = bytes.fromhex(declaration_hex)
2021-02-17 09:19:42 +01:00
declaration = None
try:
declaration = declaration_bytes.decode('utf-8', errors='strict')
except UnicodeDecodeError:
continue
return declaration
@celery_app.task(bind=True, base=BaseTask)
def translate_tx_addresses(self, tx, trusted_addresses, chain_spec_dict):
2021-02-17 09:19:42 +01:00
chain_spec = ChainSpec.from_dict(chain_spec_dict)
2021-02-17 09:19:42 +01:00
declaration = None
if tx['sender_label'] == None:
declaration = translate_address(tx['sender'], trusted_addresses, chain_spec, self.call_address)
2021-02-17 09:19:42 +01:00
tx['sender_label'] = declaration
declaration = None
if tx['recipient_label'] == None:
declaration = translate_address(tx['recipient'], trusted_addresses, chain_spec, self.call_address)
2021-02-17 09:19:42 +01:00
tx['recipient_label'] = declaration
return tx