chainlib/chainlib/eth/contract.py

289 lines
7.0 KiB
Python

# standard imports
import enum
import re
import logging
# external imports
from hexathon import (
strip_0x,
pad,
)
# local imports
from chainlib.hash import keccak256_string_to_hex
from chainlib.block import BlockSpec
from chainlib.jsonrpc import jsonrpc_template
from .address import to_checksum_address
#logg = logging.getLogger(__name__)
logg = logging.getLogger()
re_method = r'^[a-zA-Z0-9_]+$'
class ABIContractType(enum.Enum):
BYTES32 = 'bytes32'
BYTES4 = 'bytes4'
UINT256 = 'uint256'
ADDRESS = 'address'
STRING = 'string'
BOOLEAN = 'bool'
dynamic_contract_types = [
ABIContractType.STRING,
]
class ABIContractDecoder:
def __init__(self):
self.types = []
self.contents = []
def typ(self, v):
if not isinstance(v, ABIContractType):
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
self.types.append(v.value)
self.__log_typ()
def val(self, v):
self.contents.append(v)
logg.debug('content is now {}'.format(self.contents))
def uint256(self, v):
return int(v, 16)
def bytes32(self, v):
return v
def bool(self, v):
return bool(self.uint256(v))
def boolean(self, v):
return bool(self.uint256(v))
def address(self, v):
a = strip_0x(v)[64-40:]
return to_checksum_address(a)
def string(self, v):
s = strip_0x(v)
b = bytes.fromhex(s)
cursor = 0
offset = int.from_bytes(b[cursor:cursor+32], 'big')
cursor += 32
length = int.from_bytes(b[cursor:cursor+32], 'big')
cursor += 32
content = b[cursor:cursor+length]
logg.debug('parsing string offset {} length {} content {}'.format(offset, length, content))
return content.decode('utf-8')
def __log_typ(self):
logg.debug('types set to ({})'.format(','.join(self.types)))
def decode(self):
r = []
for i in range(len(self.types)):
m = getattr(self, self.types[i])
r.append(m(self.contents[i]))
return r
def get(self):
return self.decode()
def __str__(self):
return self.decode()
class ABIContractEncoder:
def __init__(self):
self.types = []
self.contents = []
self.method_name = None
self.method_contents = []
def method(self, m):
if re.match(re_method, m) == None:
raise ValueError('Invalid method {}, must match regular expression {}'.format(re_method))
self.method_name = m
self.__log_method()
def typ(self, v):
if self.method_name == None:
raise AttributeError('method name must be set before adding types')
if not isinstance(v, ABIContractType):
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
self.method_contents.append(v.value)
self.__log_method()
def __log_method(self):
logg.debug('method set to {}'.format(self.get_method()))
def __log_latest(self, v):
l = len(self.types) - 1
logg.debug('Encoder added {} -> {} ({})'.format(v, self.contents[l], self.types[l].value))
def uint256(self, v):
v = int(v)
b = v.to_bytes(32, 'big')
self.contents.append(b.hex())
self.types.append(ABIContractType.UINT256)
self.__log_latest(v)
def bool(self, v):
return self.boolean(v)
def boolean(self, v):
if bool(v):
return self.uint256(1)
return self.uint256(0)
def address(self, v):
self.bytes_fixed(32, v, 20)
self.types.append(ABIContractType.ADDRESS)
self.__log_latest(v)
def bytes32(self, v):
self.bytes_fixed(32, v)
self.types.append(ABIContractType.BYTES32)
self.__log_latest(v)
def bytes4(self, v):
self.bytes_fixed(4, v)
self.types.append(ABIContractType.BYTES4)
self.__log_latest(v)
def string(self, v):
b = v.encode('utf-8')
l = len(b)
contents = l.to_bytes(32, 'big')
contents += b
padlen = 32 - (l % 32)
contents += padlen * b'\x00'
self.bytes_fixed(len(contents), contents)
self.types.append(ABIContractType.STRING)
self.__log_latest(v)
return contents
def bytes_fixed(self, mx, v, exact=0):
typ = type(v).__name__
if typ == 'str':
v = strip_0x(v)
l = len(v)
if exact > 0 and l != exact * 2:
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
if l > mx * 2:
raise ValueError('value too long ({})'.format(l))
v = pad(v, mx)
elif typ == 'bytes':
l = len(v)
if exact > 0 and l != exact:
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
b = bytearray(mx)
b[mx-l:] = v
v = pad(b.hex(), mx)
else:
raise ValueError('invalid input {}'.format(typ))
self.contents.append(v.ljust(64, '0'))
def get_method(self):
if self.method_name == None:
return ''
return '{}({})'.format(self.method_name, ','.join(self.method_contents))
def get_method_signature(self):
s = self.get_method()
if s == '':
return s
return keccak256_string_to_hex(s)[:8]
def get_contents(self):
direct_contents = ''
pointer_contents = ''
l = len(self.types)
pointer_cursor = 32 * l
for i in range(l):
if self.types[i] in dynamic_contract_types:
content_length = len(self.contents[i])
pointer_contents += self.contents[i]
direct_contents += pointer_cursor.to_bytes(32, 'big').hex()
pointer_cursor += int(content_length / 2)
else:
direct_contents += self.contents[i]
s = ''.join(direct_contents + pointer_contents)
for i in range(0, len(s), 64):
l = len(s) - i
if l > 64:
l = 64
logg.debug('code word {} {}'.format(int(i / 64), s[i:i+64]))
return s
def get(self):
return self.encode()
def encode(self):
m = self.get_method_signature()
c = self.get_contents()
return m + c
def __str__(self):
return self.encode()
def abi_decode_single(typ, v):
d = ABIContractDecoder()
d.typ(typ)
d.val(v)
r = d.decode()
return r[0]
def code(address, block_spec=BlockSpec.LATEST):
block_height = None
if block_spec == BlockSpec.LATEST:
block_height = 'latest'
elif block_spec == BlockSpec.PENDING:
block_height = 'pending'
else:
block_height = int(block_spec)
o = jsonrpc_template()
o['method'] = 'eth_getCode'
o['params'].append(address)
o['params'].append(block_height)
return o