# standard imports import os import unittest import json import logging # external imports from chainlib.eth.unittest.ethtester import EthTesterCase from chainlib.connection import RPCConnection from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.address import to_checksum_address from chainlib.eth.tx import ( receipt, transaction, TxFormat, ) from chainlib.eth.contract import ( abi_decode_single, ABIContractType, ) # local imports from eth_accounts_index.registry import AccountRegistry from eth_accounts_index import AccountsIndex logging.basicConfig(level=logging.DEBUG) logg = logging.getLogger() testdir = os.path.dirname(__file__) class TestNonceOracle: def __init__(self, address, default_value=0): self.nonce = default_value def next_nonce(self): nonce = self.nonce self.nonce += 1 return nonce class Test(EthTesterCase): def setUp(self): super(Test, self).setUp() nonce_oracle = TestNonceOracle(self.accounts[0]) c = AccountRegistry(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) (tx_hash, o) = c.constructor(self.accounts[0]) self.conn = RPCConnection.connect(self.chain_spec, 'default') r = self.conn.do(o) logg.debug('deployed with hash {}'.format(r)) o = receipt(r) r = self.conn.do(o) self.address = to_checksum_address(r['contract_address']) (tx_hash, o) = c.add_writer(self.address, self.accounts[0], self.accounts[0]) r = self.conn.do(o) o = receipt(r) r = self.conn.do(o) self.assertEqual(r['status'], 1) def test_1_count(self): #o = self.o.count(self.address, sender_address=self.accounts[0]) c = AccountsIndex(self.chain_spec) o = c.entry_count(self.address, sender_address=self.accounts[0]) r = self.conn.do(o) r = abi_decode_single(ABIContractType.UINT256, r) self.assertEqual(r, 0) def test_2_add(self): b = os.urandom(20) a = to_checksum_address(b.hex()) nonce_oracle = RPCNonceOracle(self.accounts[0], self.conn) c = AccountsIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) (tx_hash, o) = c.add(self.address, self.accounts[0], a) r = self.conn.do(o) self.assertEqual(tx_hash, r) o = receipt(tx_hash) rcpt = self.conn.do(o) self.helper.mine_block() o = c.have(self.address, a, sender_address=self.accounts[0]) r = self.conn.do(o) def test_3_add_rlpsigned(self): b = os.urandom(20) a = to_checksum_address(b.hex()) nonce_oracle = RPCNonceOracle(self.accounts[0], self.conn) c = AccountsIndex(self.chain_spec, signer=self.signer, nonce_oracle=nonce_oracle) (tx_hash, o) = c.add(self.address, self.accounts[0], a, tx_format=TxFormat.RLP_SIGNED) #r = self.conn.do(o) #self.assertEqual(tx_hash, r) logg.debug('o {}'.format(o)) # TODO: reinstate all tests # def test_access(self): # registry = AccountRegistry(self.w3, self.address, self.w3.eth.accounts[1]) # registry.add(self.w3.eth.accounts[2]) # self.eth_tester.mine_block() # self.assertEqual(registry.count(), 2) # # # account 2 does not have access # registry = AccountRegistry(self.w3, self.address, self.w3.eth.accounts[2]) # registry.add(self.w3.eth.accounts[2]) # self.eth_tester.mine_block() # self.assertEqual(registry.count(), 2) # # # after this account 2 has access # registry.contract.functions.addWriter(self.w3.eth.accounts[2]).transact() # registry.add(self.w3.eth.accounts[3]) # self.eth_tester.mine_block() # self.assertEqual(registry.count(), 3) # # # after this account 2 no longer has access # registry.contract.functions.deleteWriter(self.w3.eth.accounts[2]).transact() # registry.add(self.w3.eth.accounts[3]) # self.eth_tester.mine_block() # self.assertEqual(registry.count(), 3) # # # def test_indices(self): # registry = AccountRegistry(self.w3, self.address, self.w3.eth.accounts[1]) # registry.add(self.w3.eth.accounts[2]) # # self.assertTrue(registry.have(self.w3.eth.accounts[2])) # self.assertFalse(registry.have(self.w3.eth.accounts[3])) # # # def test_no_duplicates(self): # registry = AccountRegistry(self.w3, self.address, self.w3.eth.accounts[1]) # tx_hash = registry.add(self.w3.eth.accounts[2]) # self.eth_tester.mine_block() # tx_hash = registry.add(self.w3.eth.accounts[3]) # self.eth_tester.mine_block() # # BUG: eth_tester does not detect the duplicate here, but does in the test.py file in the solidity folder # #self.assertRaises(Exception): # tx_hash = registry.add(self.w3.eth.accounts[2]) # self.eth_tester.mine_block() # # # def test_list(self): # registry = AccountRegistry(self.w3, self.address, self.w3.eth.accounts[1]) # # for i in range(2, 10): # registry.add(self.w3.eth.accounts[i]) # # self.assertEqual(registry.count(), 9) # # accounts_reverse = [] # for i in range(9, 1, -1): # accounts_reverse.append(self.w3.eth.accounts[i]) # # accounts_list = registry.last(8) # for i in range(8): # self.assertEqual(accounts_list[i], accounts_reverse[i]) # if __name__ == '__main__': unittest.main()