# standard imports import unittestimport logging import random import os import logging import tempfile # external imports from chainlib.chain import ChainSpec from chainlib.eth.unittest.ethtester import EthTesterCase from chainlib.eth.tx import ( Tx, receipt, ) from chainlib.eth.gas import OverrideGasOracle from chainlib.eth.nonce import RPCNonceOracle from hexathon import ( add_0x, strip_0x, ) from funga.eth.keystore.dict import DictKeystore from eth_address_declarator.declarator import AddressDeclarator from okota.token_index.index import TokenUniqueSymbolIndexAddressDeclarator # local imports from cic.ext.eth import CICEth from cic import Proof from cic.attachment import Attachment from cic.output import KVWriter from cic.processor import Processor # test imports from tests.base_cic import test_data_dir logg = logging.getLogger(__name__) class TestCICEthBase(EthTesterCase): def setUp(self): super(TestCICEthBase, self).setUp() random.seed(42) self.initial_description = add_0x(random.randbytes(32).hex()) self.token_address = add_0x(random.randbytes(20).hex()) addresses = self.keystore.list() nonce_oracle = RPCNonceOracle(addresses[2], self.rpc) gas_oracle = OverrideGasOracle(limit=AddressDeclarator.gas(), conn=self.rpc) c = AddressDeclarator(self.chain_spec, self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) (tx_hash_hex, o) = c.constructor(addresses[2], self.initial_description) r = self.rpc.do(o) o = receipt(r) r = self.rpc.do(o) Tx.src_normalize(r) self.assertEqual(r['status'], 1) self.address_declarator_address = r['contract_address'] logg.debug('address declarator deployed at {}'.format(self.address_declarator_address)) nonce_oracle = RPCNonceOracle(addresses[1], self.rpc) gas_oracle = OverrideGasOracle(limit=TokenUniqueSymbolIndexAddressDeclarator.gas(), conn=self.rpc) c = TokenUniqueSymbolIndexAddressDeclarator(self.chain_spec, self.signer, nonce_oracle=nonce_oracle, gas_oracle=gas_oracle) (tx_hash_hex, o) = c.constructor(addresses[1], self.address_declarator_address) r = self.rpc.do(o) o = receipt(r) r = self.rpc.do(o) Tx.src_normalize(r) self.assertEqual(r['status'], 1) self.token_index_address = r['contract_address'] logg.debug('token index deployed at {}'.format(self.token_index_address)) self.resources = { 'token': { 'reference': self.token_address, 'key_address': addresses[0], }, 'token_index': { 'reference': self.token_index_address, 'key_address': addresses[1], }, 'address_declarator': { 'reference': self.address_declarator_address, 'key_address': addresses[2], }, } proof_dir = os.path.join(test_data_dir, 'proof') attach = Attachment(path=proof_dir) attach.load() self.proofs = Proof(proof_dir, attachments=attach) self.proofs.load() self.outputs_dir = tempfile.mkdtemp() self.outputs_writer = KVWriter(self.outputs_dir) class TestCICEthTokenBase(TestCICEthBase): def setUp(self): super(TestCICEthTokenBase, self).setUp() self.resources['token']['reference'] = None self.adapter = CICEth(self.chain_spec, self.resources, self.proofs, outputs_writer=self.outputs_writer) self.token_name = 'FOotoken' self.token_symbol = 'FOO' self.token_precision = 8 self.core_processor = Processor(outputs_writer=self.outputs_writer, extensions=[self.adapter])