# standard imports import os import logging # external imports import celery import pytest from chainlib.eth.tx import ( unpack, TxFormat, ) from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.gas import Gas from chainlib.eth.address import to_checksum_address from hexathon import ( strip_0x, add_0x, ) # local imports from cic_eth.api import AdminApi from cic_eth.db.models.role import AccountRole from cic_eth.db.models.otx import Otx from cic_eth.db.models.tx import TxCache from cic_eth.db.enum import ( StatusEnum, StatusBits, status_str, LockEnum, ) from cic_eth.error import InitializationError from cic_eth.eth.tx import ( cache_gas_data, ) #from cic_eth.eth.gas import cache_gas_tx from cic_eth.queue.tx import ( create as queue_create, get_tx, ) logg = logging.getLogger() #def test_resend_inplace( # default_chain_spec, # init_database, # init_w3, # celery_session_worker, # ): # # chain_str = str(default_chain_spec) # c = RpcClient(default_chain_spec) # # sigs = [] # # gas_provider = c.gas_provider() # # s_nonce = celery.signature( # 'cic_eth.eth.tx.reserve_nonce', # [ # init_w3.eth.accounts[0], # gas_provider, # ], # queue=None, # ) # s_refill = celery.signature( # 'cic_eth.eth.tx.refill_gas', # [ # chain_str, # ], # queue=None, # ) # s_nonce.link(s_refill) # t = s_nonce.apply_async() # t.get() # for r in t.collect(): # pass # assert t.successful() # # q = init_database.query(Otx) # q = q.join(TxCache) # q = q.filter(TxCache.recipient==init_w3.eth.accounts[0]) # o = q.first() # tx_raw = o.signed_tx # # tx_dict = unpack_signed_raw_tx(bytes.fromhex(tx_raw[2:]), default_chain_spec.chain_id()) # gas_price_before = tx_dict['gasPrice'] # # s = celery.signature( # 'cic_eth.admin.ctrl.lock_send', # [ # chain_str, # init_w3.eth.accounts[0], # ], # queue=None, # ) # t = s.apply_async() # t.get() # assert t.successful() # # api = AdminApi(c, queue=None) # t = api.resend(tx_dict['hash'], chain_str, unlock=True) # t.get() # i = 0 # tx_hash_new_hex = None # for r in t.collect(): # tx_hash_new_hex = r[1] # assert t.successful() # # tx_raw_new = get_tx(tx_hash_new_hex) # logg.debug('get {}'.format(tx_raw_new)) # tx_dict_new = unpack_signed_raw_tx(bytes.fromhex(tx_raw_new['signed_tx'][2:]), default_chain_spec.chain_id()) # assert tx_hash_new_hex != tx_dict['hash'] # assert tx_dict_new['gasPrice'] > gas_price_before # # tx_dict_after = get_tx(tx_dict['hash']) # # logg.debug('logggg {}'.format(status_str(tx_dict_after['status']))) # assert tx_dict_after['status'] & StatusBits.MANUAL #def test_check_fix_nonce( # default_chain_spec, # init_database, # init_eth_account_roles, # init_w3, # eth_empty_accounts, # celery_session_worker, # ): # # chain_str = str(default_chain_spec) # # sigs = [] # for i in range(5): # s = celery.signature( # 'cic_eth.eth.tx.refill_gas', # [ # eth_empty_accounts[i], # chain_str, # ], # queue=None, # ) # sigs.append(s) # # t = celery.group(sigs)() # txs = t.get() # assert t.successful() # # tx_hash = web3.Web3.keccak(hexstr=txs[2]) # c = RpcClient(default_chain_spec) # api = AdminApi(c, queue=None) # address = init_eth_account_roles['eth_account_gas_provider'] # nonce_spec = api.check_nonce(address) # assert nonce_spec['nonce']['network'] == 0 # assert nonce_spec['nonce']['queue'] == 4 # assert nonce_spec['nonce']['blocking'] == None # # s_set = celery.signature( # 'cic_eth.queue.tx.set_rejected', # [ # tx_hash.hex(), # ], # queue=None, # ) # t = s_set.apply_async() # t.get() # t.collect() # assert t.successful() # # # nonce_spec = api.check_nonce(address) # assert nonce_spec['nonce']['blocking'] == 2 # assert nonce_spec['tx']['blocking'] == tx_hash.hex() # # t = api.fix_nonce(address, nonce_spec['nonce']['blocking']) # t.get() # t.collect() # assert t.successful() # # for tx in txs[3:]: # tx_hash = web3.Web3.keccak(hexstr=tx) # tx_dict = get_tx(tx_hash.hex()) # assert tx_dict['status'] == StatusEnum.OVERRIDDEN # # def test_have_account( default_chain_spec, custodial_roles, init_celery_tasks, eth_rpc, celery_session_worker, ): api = AdminApi(None, queue=None) t = api.have_account(custodial_roles['ALICE'], default_chain_spec) assert t.get() != None bogus_address = add_0x(to_checksum_address(os.urandom(20).hex())) api = AdminApi(None, queue=None) t = api.have_account(bogus_address, default_chain_spec) assert t.get() == None def test_locking( default_chain_spec, init_database, agent_roles, init_celery_tasks, celery_session_worker, ): api = AdminApi(None, queue=None) t = api.lock(default_chain_spec, agent_roles['ALICE'], LockEnum.SEND) t.get() t = api.get_lock() r = t.get() assert len(r) == 1 t = api.unlock(default_chain_spec, agent_roles['ALICE'], LockEnum.SEND) t.get() t = api.get_lock() r = t.get() assert len(r) == 0 def test_tag_account( default_chain_spec, init_database, agent_roles, eth_rpc, init_celery_tasks, celery_session_worker, ): api = AdminApi(eth_rpc, queue=None) t = api.tag_account('foo', agent_roles['ALICE'], default_chain_spec) t.get() t = api.tag_account('bar', agent_roles['BOB'], default_chain_spec) t.get() t = api.tag_account('bar', agent_roles['CAROL'], default_chain_spec) t.get() assert AccountRole.get_address('foo', init_database) == agent_roles['ALICE'] assert AccountRole.get_address('bar', init_database) == agent_roles['CAROL'] #def test_ready( # init_database, # agent_roles, # eth_rpc, # ): # # api = AdminApi(eth_rpc) # # with pytest.raises(InitializationError): # api.ready() # # bogus_account = os.urandom(20) # bogus_account_hex = '0x' + bogus_account.hex() # # api.tag_account('ETH_GAS_PROVIDER_ADDRESS', web3.Web3.toChecksumAddress(bogus_account_hex)) # with pytest.raises(KeyError): # api.ready() # # api.tag_account('ETH_GAS_PROVIDER_ADDRESS', eth_empty_accounts[0]) # api.ready() def test_tx( default_chain_spec, cic_registry, init_database, eth_rpc, eth_signer, agent_roles, contract_roles, celery_session_worker, ): chain_id = default_chain_spec.chain_id() nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) c = Gas(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) (tx_hash_hex, tx_signed_raw_hex) = c.create(agent_roles['ALICE'], agent_roles['BOB'], 1024, tx_format=TxFormat.RLP_SIGNED) tx = unpack(bytes.fromhex(strip_0x(tx_signed_raw_hex)), chain_id) queue_create(tx['nonce'], agent_roles['ALICE'], tx_hash_hex, tx_signed_raw_hex, default_chain_spec, session=init_database) cache_gas_data(tx_hash_hex, tx_signed_raw_hex, default_chain_spec.asdict()) api = AdminApi(eth_rpc, queue=None, call_address=contract_roles['DEFAULT']) tx = api.tx(default_chain_spec, tx_hash=tx_hash_hex) logg.warning('code missing to verify tx contents {}'.format(tx))