From 4f753b990e94e7082bd10ef1ed577601e20749ca Mon Sep 17 00:00:00 2001 From: nolash Date: Sun, 21 Mar 2021 13:27:42 +0100 Subject: [PATCH] Rehabilitate address declarator with chainlib code, with tests --- python/eth_address_declarator/declarator.py | 179 ++++++++++++++----- python/eth_token_index/__init__.py | 5 +- python/eth_token_index/index.py | 17 +- python/tests/test_addressdeclarator.py | 188 ++++++++++++++++++++ python/tests/test_tokenendorse.py | 147 --------------- 5 files changed, 330 insertions(+), 206 deletions(-) create mode 100644 python/tests/test_addressdeclarator.py delete mode 100644 python/tests/test_tokenendorse.py diff --git a/python/eth_address_declarator/declarator.py b/python/eth_address_declarator/declarator.py index ef0ae31..70d2871 100644 --- a/python/eth_address_declarator/declarator.py +++ b/python/eth_address_declarator/declarator.py @@ -9,31 +9,40 @@ import json import os import hashlib +# external imports +from hexathon import ( + strip_0x, + add_0x, + ) +from chainlib.eth.tx import ( + TxFormat, + TxFactory, + ) +from chainlib.eth.contract import ( + ABIContractEncoder, + ABIContractType, + abi_decode_single, + ) +from chainlib.jsonrpc import jsonrpc_template +from chainlib.eth.constant import ZERO_ADDRESS + logg = logging.getLogger(__name__) moddir = os.path.dirname(__file__) datadir = os.path.join(moddir, 'data') -class AddressDeclarator: +def to_declarator_key(declarator_address_hex, declaration_address_hex): + h = hashlib.new('sha256') + h.update(bytes.fromhex(strip_0x(declaration_address_hex))) + h.update(bytes.fromhex(strip_0x(declarator_address_hex))) + return h.digest() + + +class AddressDeclarator(TxFactory): __abi = None __bytecode = None - __address = None - - def __init__(self, w3, address, signer_address=None): - abi = AddressDeclarator.abi() - AddressDeclarator.bytecode() - self.__address = address - self.contract = w3.eth.contract(abi=abi, address=address) - self.w3 = w3 - if signer_address != None: - self.signer_address = signer_address - else: - if type(self.w3.eth.defaultAccount).__name__ == 'Empty': - self.w3.eth.defaultAccount = self.w3.eth.accounts[0] - self.signer_address = self.w3.eth.defaultAccount - @staticmethod def abi(): @@ -53,32 +62,118 @@ class AddressDeclarator: return AddressDeclarator.__bytecode -# def token_from_symbol(self, symbol): -# return self.contract.functions.tokenSymbolIndex(symbol).call() -# def + def constructor(self, sender_address, initial_description): + code = AddressDeclarator.bytecode() + enc = ABIContractEncoder() + initial_description_hex = add_0x(initial_description) + enc.bytes32(initial_description_hex) + code += enc.get() + tx = self.template(sender_address, None, use_nonce=True) + tx = self.set_code(tx, code) + return self.build(tx) -# -# def endorsed_tokens(self, endorser_address): -# tokens = [] -# for i in range(self.contract.functions.endorserTokenCount(endorser_address).call()): -# tidx = self.contract.functions.endorser(endorser_address, i).call() -# t = self.contract.functions.tokens(tidx).call() -# tokens.append(t) -# return tokens -# -# def declared(self, declarator_address): -# addresses = [] -# for i -# -# -# def add(self, token_address, data): -# self.contract.functions.add(token_address, data).transact({'from': self.signer_address}) -# - -def to_declarator_key(declarator_address_hex, declaration_address_hex): - h = hashlib.new('sha256') - h.update(bytes.fromhex(token_address_hex[2:])) - h.update(bytes.fromhex(endorser_address_hex[2:])) - return h.digest() + + def add_declaration(self, contract_address, sender_address, subject_address, proof, tx_format=TxFormat.JSONRPC): + enc = ABIContractEncoder() + enc.method('addDeclaration') + enc.typ(ABIContractType.ADDRESS) + enc.typ(ABIContractType.BYTES32) + enc.address(subject_address) + enc.bytes32(proof) + data = enc.get() + tx = self.template(sender_address, contract_address, use_nonce=True) + tx = self.set_code(tx, data) + tx = self.finalize(tx, tx_format) + return tx + def declarator_count(self, contract_address, subject_address, sender_address=ZERO_ADDRESS): + o = jsonrpc_template() + o['method'] = 'eth_call' + enc = ABIContractEncoder() + enc.method('declaratorCount') + enc.typ(ABIContractType.ADDRESS) + enc.address(subject_address) + data = add_0x(enc.get()) + tx = self.template(sender_address, contract_address) + tx = self.set_code(tx, data) + o['params'].append(self.normalize(tx)) + return o + + + def declaration(self, contract_address, declarator_address, subject_address, sender_address=ZERO_ADDRESS): + o = jsonrpc_template() + o['method'] = 'eth_call' + enc = ABIContractEncoder() + enc.method('declaration') + enc.typ(ABIContractType.ADDRESS) + enc.typ(ABIContractType.ADDRESS) + enc.address(declarator_address) + enc.address(subject_address) + data = add_0x(enc.get()) + tx = self.template(sender_address, contract_address) + tx = self.set_code(tx, data) + o['params'].append(self.normalize(tx)) + return o + + + def declaration_address_at(self, contract_address, declarator_address, idx, sender_address=ZERO_ADDRESS): + o = jsonrpc_template() + o['method'] = 'eth_call' + enc = ABIContractEncoder() + enc.method('declarationAddressAt') + enc.typ(ABIContractType.ADDRESS) + enc.typ(ABIContractType.UINT256) + enc.address(declarator_address) + enc.uint256(idx) + data = add_0x(enc.get()) + tx = self.template(sender_address, contract_address) + tx = self.set_code(tx, data) + o['params'].append(self.normalize(tx)) + return o + + + def declarator_address_at(self, contract_address, subject_address, idx, sender_address=ZERO_ADDRESS): + o = jsonrpc_template() + o['method'] = 'eth_call' + enc = ABIContractEncoder() + enc.method('declaratorAddressAt') + enc.typ(ABIContractType.ADDRESS) + enc.typ(ABIContractType.UINT256) + enc.address(subject_address) + enc.uint256(idx) + data = add_0x(enc.get()) + tx = self.template(sender_address, contract_address) + tx = self.set_code(tx, data) + o['params'].append(self.normalize(tx)) + return o + + + @classmethod + def parse_declarator_count(self, v): + return abi_decode_single(ABIContractType.UINT256, v) + + + @classmethod + def parse_declaration(self, v): + cursor = 0 + v = strip_0x(v) + position = int.from_bytes(bytes.fromhex(v[cursor:cursor+64]), 'big') + cursor += (position * 2) + length = int.from_bytes(bytes.fromhex(v[cursor:cursor+64]), 'big') + cursor += 64 + r = [] + for i in range(length): + r.append(v[cursor:cursor+64]) + cursor += 64 + return r + + + @classmethod + def parse_declaration_address_at(self, v): + return abi_decode_single(ABIContractType.ADDRESS, v) + + + @classmethod + def parse_declarator_address_at(self, v): + return abi_decode_single(ABIContractType.ADDRESS, v) diff --git a/python/eth_token_index/__init__.py b/python/eth_token_index/__init__.py index 820625b..68ff528 100644 --- a/python/eth_token_index/__init__.py +++ b/python/eth_token_index/__init__.py @@ -1 +1,4 @@ -from .index import TokenUniqueSymbolIndex +from .index import ( + TokenUniqueSymbolIndex, + to_identifier, + ) diff --git a/python/eth_token_index/index.py b/python/eth_token_index/index.py index 21a697a..66d1564 100644 --- a/python/eth_token_index/index.py +++ b/python/eth_token_index/index.py @@ -41,8 +41,7 @@ class TokenUniqueSymbolIndex(TxFactory): __abi = None __bytecode = None - __address = None - __erc20_abi = None + @staticmethod def abi(): @@ -135,17 +134,3 @@ class TokenUniqueSymbolIndex(TxFactory): @classmethod def parse_entry_count(self, v): return abi_decode_single(ABIContractType.UINT256, v) - - -# def count(self): -# return self.contract.functions.registryCount().call() -# -# -# def get_index(self, idx): -# return self.contract.functions.entry(idx).call() -# -# -# def get_token_by_symbol(self, symbol): -# ref = to_ref(symbol) -# return self.contract.functions.addressOf(symbol).call() - diff --git a/python/tests/test_addressdeclarator.py b/python/tests/test_addressdeclarator.py new file mode 100644 index 0000000..6e09147 --- /dev/null +++ b/python/tests/test_addressdeclarator.py @@ -0,0 +1,188 @@ +# standard imports +import os +import unittest +import json +import logging +import hashlib + +# external imports +from chainlib.eth.unittest.ethtester import EthTesterCase +from chainlib.eth.contract import ( + ABIContractEncoder, + ABIContractType, + ) +from chainlib.eth.nonce import RPCNonceOracle +from chainlib.eth.tx import receipt +from giftable_erc20_token import GiftableToken +from hexathon import add_0x + +# local imports +from eth_address_declarator import AddressDeclarator + +logging.basicConfig(level=logging.DEBUG) +logg = logging.getLogger() + +logging.getLogger('web3').setLevel(logging.WARNING) +logging.getLogger('eth.vm').setLevel(logging.WARNING) + +testdir = os.path.dirname(__file__) + +description = '0x{:<064s}'.format(b'foo'.hex()) + +class Test(EthTesterCase): + + def setUp(self): + super(Test, self).setUp() + self.description = add_0x(os.urandom(32).hex()) + nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id()) + (tx_hash_hex, o) = c.constructor(self.accounts[0], self.description) + self.rpc.do(o) + + o = receipt(tx_hash_hex) + r = self.rpc.do(o) + self.assertEqual(r['status'], 1) + + self.address = r['contract_address'] + + c = GiftableToken(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id()) + (tx_hash_hex, o) = c.constructor(self.accounts[0], 'FooToken', 'FOO', 6) + self.rpc.do(o) + + o = receipt(tx_hash_hex) + r = self.rpc.do(o) + self.assertEqual(r['status'], 1) + + self.foo_token_address = r['contract_address'] + + c = GiftableToken(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id()) + (tx_hash_hex, o) = c.constructor(self.accounts[0], 'BarToken', 'BAR', 6) + self.rpc.do(o) + + o = receipt(tx_hash_hex) + r = self.rpc.do(o) + self.assertEqual(r['status'], 1) + + self.bar_token_address = r['contract_address'] + + + + def test_basic(self): + + d = add_0x(os.urandom(32).hex()) + + nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[0], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[0], self.bar_token_address, d) + self.rpc.do(o) + + o = c.declarator_count(self.address, self.foo_token_address, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declarator_count(r), 2) + + o = c.declarator_count(self.address, self.bar_token_address, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declarator_count(r), 1) + + + def test_declaration(self): + + d = add_0x(os.urandom(32).hex()) + d_two = add_0x(os.urandom(32).hex()) + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d) + self.rpc.do(o) + + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d_two) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d) + self.rpc.do(o) + + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.bar_token_address, d) + self.rpc.do(o) + + o = c.declaration(self.address, self.accounts[1], self.foo_token_address, sender_address=self.accounts[0]) + r = self.rpc.do(o) + proofs = c.parse_declaration(r) + self.assertEqual(proofs[0], d[2:]) + self.assertEqual(proofs[1], d_two[2:]) + + + + def test_declarator_to_subject(self): + d = add_0x(os.urandom(32).hex()) + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.bar_token_address, d) + self.rpc.do(o) + + o = c.declaration_address_at(self.address, self.accounts[1], 0, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declaration_address_at(r), self.foo_token_address) + + o = c.declaration_address_at(self.address, self.accounts[2], 0, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declaration_address_at(r), self.foo_token_address) + + o = c.declaration_address_at(self.address, self.accounts[1], 1, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declaration_address_at(r), self.bar_token_address) + + + def test_subject_to_declarator(self): + d = '0x' + os.urandom(32).hex() + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d) + self.rpc.do(o) + + nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc) + c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle) + (tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.bar_token_address, d) + self.rpc.do(o) + + o = c.declarator_address_at(self.address, self.foo_token_address, 0, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declaration_address_at(r), self.accounts[1]) + + o = c.declarator_address_at(self.address, self.foo_token_address, 1, sender_address=self.accounts[0]) + r = self.rpc.do(o) + self.assertEqual(c.parse_declaration_address_at(r), self.accounts[2]) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/tests/test_tokenendorse.py b/python/tests/test_tokenendorse.py deleted file mode 100644 index 7076673..0000000 --- a/python/tests/test_tokenendorse.py +++ /dev/null @@ -1,147 +0,0 @@ -import os -import unittest -import json -import logging -import hashlib - -import web3 -import eth_tester -import eth_abi - -logging.basicConfig(level=logging.DEBUG) -logg = logging.getLogger() - -logging.getLogger('web3').setLevel(logging.WARNING) -logging.getLogger('eth.vm').setLevel(logging.WARNING) - -testdir = os.path.dirname(__file__) - -description = '0x{:<064s}'.format(b'foo'.hex()) - -class Test(unittest.TestCase): - - contract = None - - def setUp(self): - eth_params = eth_tester.backends.pyevm.main.get_default_genesis_params({ - 'gas_limit': 9000000, - }) - - # create store of used accounts - f = open(os.path.join(testdir, '../eth_address_declarator/data/AddressDeclarator.bin'), 'r') - bytecode = f.read() - f.close() - - f = open(os.path.join(testdir, '../eth_address_declarator/data/AddressDeclarator.json'), 'r') - self.abi = json.load(f) - f.close() - - backend = eth_tester.PyEVMBackend(eth_params) - self.eth_tester = eth_tester.EthereumTester(backend) - provider = web3.Web3.EthereumTesterProvider(self.eth_tester) - self.w3 = web3.Web3(provider) - c = self.w3.eth.contract(abi=self.abi, bytecode=bytecode) - tx_hash = c.constructor(description).transact({'from': self.w3.eth.accounts[0]}) - - r = self.w3.eth.getTransactionReceipt(tx_hash) - - self.address = r.contractAddress - - - # create token - f = open(os.path.join(testdir, '../eth_address_declarator/data/GiftableToken.bin'), 'r') - bytecode = f.read() - f.close() - - f = open(os.path.join(testdir, '../eth_address_declarator/data/GiftableToken.json'), 'r') - self.abi_token = json.load(f) - f.close() - - t = self.w3.eth.contract(abi=self.abi_token, bytecode=bytecode) - tx_hash = t.constructor('Foo Token', 'FOO', 18).transact({'from': self.w3.eth.accounts[0]}) - - r = self.w3.eth.getTransactionReceipt(tx_hash) - - self.address_token_one = r.contractAddress - - t = self.w3.eth.contract(abi=self.abi_token, bytecode=bytecode) - tx_hash = t.constructor('Bar Token', 'BAR', 18).transact({'from': self.w3.eth.accounts[0]}) - - r = self.w3.eth.getTransactionReceipt(tx_hash) - - self.address_token_two = r.contractAddress - - - def tearDown(self): - pass - - - def test_basic(self): - c = self.w3.eth.contract(abi=self.abi, address=self.address) - - d = '0x' + os.urandom(32).hex() - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[0]}) - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[0]}) - - self.assertEqual(c.functions.declaratorCount(self.address_token_one).call(), 2) - self.assertEqual(c.functions.declaratorCount(self.address_token_two).call(), 1) - - - def test_declaration(self): - c = self.w3.eth.contract(abi=self.abi, address=self.address) - - d = '0x' + os.urandom(32).hex() - d_two = '0x' + os.urandom(32).hex() - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d_two).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]}) - c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[2]}) - - proofs = c.functions.declaration(self.w3.eth.accounts[1], self.address_token_one).call() - self.assertEqual(proofs[0].hex(), d[2:]) - self.assertEqual(proofs[1].hex(), d_two[2:]) - - - def test_declaration_count(self): - c = self.w3.eth.contract(abi=self.abi, address=self.address) - - d = '0x' + os.urandom(32).hex() - d_two = '0x' + os.urandom(32).hex() - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d_two).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]}) - c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[2]}) - - self.assertEqual(c.functions.declarationCount(self.w3.eth.accounts[1]).call(), 1) - self.assertEqual(c.functions.declarationCount(self.w3.eth.accounts[2]).call(), 2) - - - def test_declarator_to_subject(self): - c = self.w3.eth.contract(abi=self.abi, address=self.address) - - d = '0x' + os.urandom(32).hex() - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]}) - c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[1]}) - - - self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[1], 0).call(), self.address_token_one) - self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[2], 0).call(), self.address_token_one) - self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[1], 1).call(), self.address_token_two) - - - def test_subject_to_declarator(self): - c = self.w3.eth.contract(abi=self.abi, address=self.address) - - d = '0x' + os.urandom(32).hex() - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]}) - c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]}) - c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[1]}) - - self.assertEqual(c.functions.declaratorAddressAt(self.address_token_one, 0).call(), self.w3.eth.accounts[1]) - self.assertEqual(c.functions.declaratorAddressAt(self.address_token_one, 1).call(), self.w3.eth.accounts[2]) - - -if __name__ == '__main__': - unittest.main()