Rehabilitate address declarator with chainlib code, with tests

This commit is contained in:
nolash 2021-03-21 13:27:42 +01:00
parent 4095c28221
commit 4f753b990e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 330 additions and 206 deletions

View File

@ -9,31 +9,40 @@ import json
import os
import hashlib
# external imports
from hexathon import (
strip_0x,
add_0x,
)
from chainlib.eth.tx import (
TxFormat,
TxFactory,
)
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType,
abi_decode_single,
)
from chainlib.jsonrpc import jsonrpc_template
from chainlib.eth.constant import ZERO_ADDRESS
logg = logging.getLogger(__name__)
moddir = os.path.dirname(__file__)
datadir = os.path.join(moddir, 'data')
class AddressDeclarator:
def to_declarator_key(declarator_address_hex, declaration_address_hex):
h = hashlib.new('sha256')
h.update(bytes.fromhex(strip_0x(declaration_address_hex)))
h.update(bytes.fromhex(strip_0x(declarator_address_hex)))
return h.digest()
class AddressDeclarator(TxFactory):
__abi = None
__bytecode = None
__address = None
def __init__(self, w3, address, signer_address=None):
abi = AddressDeclarator.abi()
AddressDeclarator.bytecode()
self.__address = address
self.contract = w3.eth.contract(abi=abi, address=address)
self.w3 = w3
if signer_address != None:
self.signer_address = signer_address
else:
if type(self.w3.eth.defaultAccount).__name__ == 'Empty':
self.w3.eth.defaultAccount = self.w3.eth.accounts[0]
self.signer_address = self.w3.eth.defaultAccount
@staticmethod
def abi():
@ -53,32 +62,118 @@ class AddressDeclarator:
return AddressDeclarator.__bytecode
# def token_from_symbol(self, symbol):
# return self.contract.functions.tokenSymbolIndex(symbol).call()
# def
def constructor(self, sender_address, initial_description):
code = AddressDeclarator.bytecode()
enc = ABIContractEncoder()
initial_description_hex = add_0x(initial_description)
enc.bytes32(initial_description_hex)
code += enc.get()
tx = self.template(sender_address, None, use_nonce=True)
tx = self.set_code(tx, code)
return self.build(tx)
#
# def endorsed_tokens(self, endorser_address):
# tokens = []
# for i in range(self.contract.functions.endorserTokenCount(endorser_address).call()):
# tidx = self.contract.functions.endorser(endorser_address, i).call()
# t = self.contract.functions.tokens(tidx).call()
# tokens.append(t)
# return tokens
#
# def declared(self, declarator_address):
# addresses = []
# for i
#
#
# def add(self, token_address, data):
# self.contract.functions.add(token_address, data).transact({'from': self.signer_address})
#
def to_declarator_key(declarator_address_hex, declaration_address_hex):
h = hashlib.new('sha256')
h.update(bytes.fromhex(token_address_hex[2:]))
h.update(bytes.fromhex(endorser_address_hex[2:]))
return h.digest()
def add_declaration(self, contract_address, sender_address, subject_address, proof, tx_format=TxFormat.JSONRPC):
enc = ABIContractEncoder()
enc.method('addDeclaration')
enc.typ(ABIContractType.ADDRESS)
enc.typ(ABIContractType.BYTES32)
enc.address(subject_address)
enc.bytes32(proof)
data = enc.get()
tx = self.template(sender_address, contract_address, use_nonce=True)
tx = self.set_code(tx, data)
tx = self.finalize(tx, tx_format)
return tx
def declarator_count(self, contract_address, subject_address, sender_address=ZERO_ADDRESS):
o = jsonrpc_template()
o['method'] = 'eth_call'
enc = ABIContractEncoder()
enc.method('declaratorCount')
enc.typ(ABIContractType.ADDRESS)
enc.address(subject_address)
data = add_0x(enc.get())
tx = self.template(sender_address, contract_address)
tx = self.set_code(tx, data)
o['params'].append(self.normalize(tx))
return o
def declaration(self, contract_address, declarator_address, subject_address, sender_address=ZERO_ADDRESS):
o = jsonrpc_template()
o['method'] = 'eth_call'
enc = ABIContractEncoder()
enc.method('declaration')
enc.typ(ABIContractType.ADDRESS)
enc.typ(ABIContractType.ADDRESS)
enc.address(declarator_address)
enc.address(subject_address)
data = add_0x(enc.get())
tx = self.template(sender_address, contract_address)
tx = self.set_code(tx, data)
o['params'].append(self.normalize(tx))
return o
def declaration_address_at(self, contract_address, declarator_address, idx, sender_address=ZERO_ADDRESS):
o = jsonrpc_template()
o['method'] = 'eth_call'
enc = ABIContractEncoder()
enc.method('declarationAddressAt')
enc.typ(ABIContractType.ADDRESS)
enc.typ(ABIContractType.UINT256)
enc.address(declarator_address)
enc.uint256(idx)
data = add_0x(enc.get())
tx = self.template(sender_address, contract_address)
tx = self.set_code(tx, data)
o['params'].append(self.normalize(tx))
return o
def declarator_address_at(self, contract_address, subject_address, idx, sender_address=ZERO_ADDRESS):
o = jsonrpc_template()
o['method'] = 'eth_call'
enc = ABIContractEncoder()
enc.method('declaratorAddressAt')
enc.typ(ABIContractType.ADDRESS)
enc.typ(ABIContractType.UINT256)
enc.address(subject_address)
enc.uint256(idx)
data = add_0x(enc.get())
tx = self.template(sender_address, contract_address)
tx = self.set_code(tx, data)
o['params'].append(self.normalize(tx))
return o
@classmethod
def parse_declarator_count(self, v):
return abi_decode_single(ABIContractType.UINT256, v)
@classmethod
def parse_declaration(self, v):
cursor = 0
v = strip_0x(v)
position = int.from_bytes(bytes.fromhex(v[cursor:cursor+64]), 'big')
cursor += (position * 2)
length = int.from_bytes(bytes.fromhex(v[cursor:cursor+64]), 'big')
cursor += 64
r = []
for i in range(length):
r.append(v[cursor:cursor+64])
cursor += 64
return r
@classmethod
def parse_declaration_address_at(self, v):
return abi_decode_single(ABIContractType.ADDRESS, v)
@classmethod
def parse_declarator_address_at(self, v):
return abi_decode_single(ABIContractType.ADDRESS, v)

View File

@ -1 +1,4 @@
from .index import TokenUniqueSymbolIndex
from .index import (
TokenUniqueSymbolIndex,
to_identifier,
)

View File

@ -41,8 +41,7 @@ class TokenUniqueSymbolIndex(TxFactory):
__abi = None
__bytecode = None
__address = None
__erc20_abi = None
@staticmethod
def abi():
@ -135,17 +134,3 @@ class TokenUniqueSymbolIndex(TxFactory):
@classmethod
def parse_entry_count(self, v):
return abi_decode_single(ABIContractType.UINT256, v)
# def count(self):
# return self.contract.functions.registryCount().call()
#
#
# def get_index(self, idx):
# return self.contract.functions.entry(idx).call()
#
#
# def get_token_by_symbol(self, symbol):
# ref = to_ref(symbol)
# return self.contract.functions.addressOf(symbol).call()

View File

@ -0,0 +1,188 @@
# standard imports
import os
import unittest
import json
import logging
import hashlib
# external imports
from chainlib.eth.unittest.ethtester import EthTesterCase
from chainlib.eth.contract import (
ABIContractEncoder,
ABIContractType,
)
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.tx import receipt
from giftable_erc20_token import GiftableToken
from hexathon import add_0x
# local imports
from eth_address_declarator import AddressDeclarator
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
logging.getLogger('web3').setLevel(logging.WARNING)
logging.getLogger('eth.vm').setLevel(logging.WARNING)
testdir = os.path.dirname(__file__)
description = '0x{:<064s}'.format(b'foo'.hex())
class Test(EthTesterCase):
def setUp(self):
super(Test, self).setUp()
self.description = add_0x(os.urandom(32).hex())
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id())
(tx_hash_hex, o) = c.constructor(self.accounts[0], self.description)
self.rpc.do(o)
o = receipt(tx_hash_hex)
r = self.rpc.do(o)
self.assertEqual(r['status'], 1)
self.address = r['contract_address']
c = GiftableToken(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id())
(tx_hash_hex, o) = c.constructor(self.accounts[0], 'FooToken', 'FOO', 6)
self.rpc.do(o)
o = receipt(tx_hash_hex)
r = self.rpc.do(o)
self.assertEqual(r['status'], 1)
self.foo_token_address = r['contract_address']
c = GiftableToken(signer=self.signer, nonce_oracle=nonce_oracle, chain_id=self.chain_spec.chain_id())
(tx_hash_hex, o) = c.constructor(self.accounts[0], 'BarToken', 'BAR', 6)
self.rpc.do(o)
o = receipt(tx_hash_hex)
r = self.rpc.do(o)
self.assertEqual(r['status'], 1)
self.bar_token_address = r['contract_address']
def test_basic(self):
d = add_0x(os.urandom(32).hex())
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[0], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[0], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[0], self.bar_token_address, d)
self.rpc.do(o)
o = c.declarator_count(self.address, self.foo_token_address, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declarator_count(r), 2)
o = c.declarator_count(self.address, self.bar_token_address, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declarator_count(r), 1)
def test_declaration(self):
d = add_0x(os.urandom(32).hex())
d_two = add_0x(os.urandom(32).hex())
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d)
self.rpc.do(o)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d_two)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d)
self.rpc.do(o)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.bar_token_address, d)
self.rpc.do(o)
o = c.declaration(self.address, self.accounts[1], self.foo_token_address, sender_address=self.accounts[0])
r = self.rpc.do(o)
proofs = c.parse_declaration(r)
self.assertEqual(proofs[0], d[2:])
self.assertEqual(proofs[1], d_two[2:])
def test_declarator_to_subject(self):
d = add_0x(os.urandom(32).hex())
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.bar_token_address, d)
self.rpc.do(o)
o = c.declaration_address_at(self.address, self.accounts[1], 0, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declaration_address_at(r), self.foo_token_address)
o = c.declaration_address_at(self.address, self.accounts[2], 0, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declaration_address_at(r), self.foo_token_address)
o = c.declaration_address_at(self.address, self.accounts[1], 1, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declaration_address_at(r), self.bar_token_address)
def test_subject_to_declarator(self):
d = '0x' + os.urandom(32).hex()
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[2], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[2], self.foo_token_address, d)
self.rpc.do(o)
nonce_oracle = RPCNonceOracle(self.accounts[1], self.rpc)
c = AddressDeclarator(signer=self.signer, nonce_oracle=nonce_oracle)
(tx_hash_hex, o) = c.add_declaration(self.address, self.accounts[1], self.bar_token_address, d)
self.rpc.do(o)
o = c.declarator_address_at(self.address, self.foo_token_address, 0, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declaration_address_at(r), self.accounts[1])
o = c.declarator_address_at(self.address, self.foo_token_address, 1, sender_address=self.accounts[0])
r = self.rpc.do(o)
self.assertEqual(c.parse_declaration_address_at(r), self.accounts[2])
if __name__ == '__main__':
unittest.main()

View File

@ -1,147 +0,0 @@
import os
import unittest
import json
import logging
import hashlib
import web3
import eth_tester
import eth_abi
logging.basicConfig(level=logging.DEBUG)
logg = logging.getLogger()
logging.getLogger('web3').setLevel(logging.WARNING)
logging.getLogger('eth.vm').setLevel(logging.WARNING)
testdir = os.path.dirname(__file__)
description = '0x{:<064s}'.format(b'foo'.hex())
class Test(unittest.TestCase):
contract = None
def setUp(self):
eth_params = eth_tester.backends.pyevm.main.get_default_genesis_params({
'gas_limit': 9000000,
})
# create store of used accounts
f = open(os.path.join(testdir, '../eth_address_declarator/data/AddressDeclarator.bin'), 'r')
bytecode = f.read()
f.close()
f = open(os.path.join(testdir, '../eth_address_declarator/data/AddressDeclarator.json'), 'r')
self.abi = json.load(f)
f.close()
backend = eth_tester.PyEVMBackend(eth_params)
self.eth_tester = eth_tester.EthereumTester(backend)
provider = web3.Web3.EthereumTesterProvider(self.eth_tester)
self.w3 = web3.Web3(provider)
c = self.w3.eth.contract(abi=self.abi, bytecode=bytecode)
tx_hash = c.constructor(description).transact({'from': self.w3.eth.accounts[0]})
r = self.w3.eth.getTransactionReceipt(tx_hash)
self.address = r.contractAddress
# create token
f = open(os.path.join(testdir, '../eth_address_declarator/data/GiftableToken.bin'), 'r')
bytecode = f.read()
f.close()
f = open(os.path.join(testdir, '../eth_address_declarator/data/GiftableToken.json'), 'r')
self.abi_token = json.load(f)
f.close()
t = self.w3.eth.contract(abi=self.abi_token, bytecode=bytecode)
tx_hash = t.constructor('Foo Token', 'FOO', 18).transact({'from': self.w3.eth.accounts[0]})
r = self.w3.eth.getTransactionReceipt(tx_hash)
self.address_token_one = r.contractAddress
t = self.w3.eth.contract(abi=self.abi_token, bytecode=bytecode)
tx_hash = t.constructor('Bar Token', 'BAR', 18).transact({'from': self.w3.eth.accounts[0]})
r = self.w3.eth.getTransactionReceipt(tx_hash)
self.address_token_two = r.contractAddress
def tearDown(self):
pass
def test_basic(self):
c = self.w3.eth.contract(abi=self.abi, address=self.address)
d = '0x' + os.urandom(32).hex()
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[0]})
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[0]})
self.assertEqual(c.functions.declaratorCount(self.address_token_one).call(), 2)
self.assertEqual(c.functions.declaratorCount(self.address_token_two).call(), 1)
def test_declaration(self):
c = self.w3.eth.contract(abi=self.abi, address=self.address)
d = '0x' + os.urandom(32).hex()
d_two = '0x' + os.urandom(32).hex()
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d_two).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]})
c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[2]})
proofs = c.functions.declaration(self.w3.eth.accounts[1], self.address_token_one).call()
self.assertEqual(proofs[0].hex(), d[2:])
self.assertEqual(proofs[1].hex(), d_two[2:])
def test_declaration_count(self):
c = self.w3.eth.contract(abi=self.abi, address=self.address)
d = '0x' + os.urandom(32).hex()
d_two = '0x' + os.urandom(32).hex()
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d_two).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]})
c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[2]})
self.assertEqual(c.functions.declarationCount(self.w3.eth.accounts[1]).call(), 1)
self.assertEqual(c.functions.declarationCount(self.w3.eth.accounts[2]).call(), 2)
def test_declarator_to_subject(self):
c = self.w3.eth.contract(abi=self.abi, address=self.address)
d = '0x' + os.urandom(32).hex()
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]})
c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[1]})
self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[1], 0).call(), self.address_token_one)
self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[2], 0).call(), self.address_token_one)
self.assertEqual(c.functions.declarationAddressAt(self.w3.eth.accounts[1], 1).call(), self.address_token_two)
def test_subject_to_declarator(self):
c = self.w3.eth.contract(abi=self.abi, address=self.address)
d = '0x' + os.urandom(32).hex()
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[1]})
c.functions.addDeclaration(self.address_token_one, d).transact({'from': self.w3.eth.accounts[2]})
c.functions.addDeclaration(self.address_token_two, d).transact({'from': self.w3.eth.accounts[1]})
self.assertEqual(c.functions.declaratorAddressAt(self.address_token_one, 0).call(), self.w3.eth.accounts[1])
self.assertEqual(c.functions.declaratorAddressAt(self.address_token_one, 1).call(), self.w3.eth.accounts[2])
if __name__ == '__main__':
unittest.main()