329 lines
8.1 KiB
Python
329 lines
8.1 KiB
Python
# standard imports
|
|
import enum
|
|
import re
|
|
import logging
|
|
|
|
# external imports
|
|
from hexathon import (
|
|
strip_0x,
|
|
pad,
|
|
)
|
|
|
|
# local imports
|
|
from chainlib.hash import keccak256_string_to_hex
|
|
from chainlib.block import BlockSpec
|
|
from chainlib.jsonrpc import JSONRPCRequest
|
|
from .address import to_checksum_address
|
|
|
|
#logg = logging.getLogger(__name__)
|
|
logg = logging.getLogger()
|
|
|
|
|
|
re_method = r'^[a-zA-Z0-9_]+$'
|
|
|
|
class ABIContractType(enum.Enum):
|
|
|
|
BYTES32 = 'bytes32'
|
|
BYTES4 = 'bytes4'
|
|
UINT256 = 'uint256'
|
|
ADDRESS = 'address'
|
|
STRING = 'string'
|
|
BOOLEAN = 'bool'
|
|
|
|
dynamic_contract_types = [
|
|
ABIContractType.STRING,
|
|
]
|
|
|
|
|
|
class ABIContract:
|
|
|
|
def __init__(self):
|
|
self.types = []
|
|
self.contents = []
|
|
|
|
|
|
class ABIMethodEncoder(ABIContract):
|
|
|
|
def __init__(self):
|
|
super(ABIMethodEncoder, self).__init__()
|
|
self.method_name = None
|
|
self.method_contents = []
|
|
|
|
|
|
def method(self, m):
|
|
if re.match(re_method, m) == None:
|
|
raise ValueError('Invalid method {}, must match regular expression {}'.format(re_method))
|
|
self.method_name = m
|
|
self.__log_method()
|
|
|
|
|
|
def get_method(self):
|
|
if self.method_name == None:
|
|
return ''
|
|
return '{}({})'.format(self.method_name, ','.join(self.method_contents))
|
|
|
|
|
|
def typ(self, v):
|
|
if self.method_name == None:
|
|
raise AttributeError('method name must be set before adding types')
|
|
if not isinstance(v, ABIContractType):
|
|
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
|
|
self.method_contents.append(v.value)
|
|
self.__log_method()
|
|
|
|
|
|
def __log_method(self):
|
|
logg.debug('method set to {}'.format(self.get_method()))
|
|
|
|
|
|
|
|
class ABIContractDecoder:
|
|
|
|
|
|
def typ(self, v):
|
|
if not isinstance(v, ABIContractType):
|
|
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
|
|
self.types.append(v.value)
|
|
self.__log_typ()
|
|
|
|
|
|
def val(self, v):
|
|
self.contents.append(v)
|
|
logg.debug('content is now {}'.format(self.contents))
|
|
|
|
|
|
def uint256(self, v):
|
|
return int(v, 16)
|
|
|
|
|
|
def bytes32(self, v):
|
|
return v
|
|
|
|
|
|
def bool(self, v):
|
|
return bool(self.uint256(v))
|
|
|
|
|
|
def boolean(self, v):
|
|
return bool(self.uint256(v))
|
|
|
|
|
|
def address(self, v):
|
|
a = strip_0x(v)[64-40:]
|
|
return to_checksum_address(a)
|
|
|
|
|
|
def string(self, v):
|
|
s = strip_0x(v)
|
|
b = bytes.fromhex(s)
|
|
cursor = 0
|
|
offset = int.from_bytes(b[cursor:cursor+32], 'big')
|
|
cursor += 32
|
|
length = int.from_bytes(b[cursor:cursor+32], 'big')
|
|
cursor += 32
|
|
content = b[cursor:cursor+length]
|
|
logg.debug('parsing string offset {} length {} content {}'.format(offset, length, content))
|
|
return content.decode('utf-8')
|
|
|
|
|
|
def __log_typ(self):
|
|
logg.debug('types set to ({})'.format(','.join(self.types)))
|
|
|
|
|
|
def decode(self):
|
|
r = []
|
|
logg.debug('contents {}'.format(self.contents))
|
|
for i in range(len(self.types)):
|
|
m = getattr(self, self.types[i])
|
|
s = self.contents[i]
|
|
logg.debug('{} {} {} {} {}'.format(i, m, self.types[i], self.contents[i], s))
|
|
r.append(m(s.hex()))
|
|
return r
|
|
|
|
|
|
def get(self):
|
|
return self.decode()
|
|
|
|
|
|
def __str__(self):
|
|
return self.decode()
|
|
|
|
|
|
class ABIContractLogDecoder(ABIMethodEncoder, ABIContractDecoder):
|
|
|
|
def __init__(self):
|
|
super(ABIContractLogDecoder, self).__init__()
|
|
self.method_name = None
|
|
self.indexed_content = []
|
|
|
|
|
|
def topic(self, event):
|
|
self.method(event)
|
|
|
|
|
|
def get_method_signature(self):
|
|
s = self.get_method()
|
|
return keccak256_string_to_hex(s)
|
|
|
|
|
|
def typ(self, v):
|
|
super(ABIContractLogDecoder, self).typ(v)
|
|
self.types.append(v.value)
|
|
|
|
|
|
def apply(self, topics, data):
|
|
t = self.get_method_signature()
|
|
if topics[0] != t:
|
|
raise ValueError('topic mismatch')
|
|
for i in range(len(topics) - 1):
|
|
self.contents.append(topics[i+1])
|
|
self.contents += data
|
|
|
|
|
|
class ABIContractEncoder(ABIMethodEncoder):
|
|
|
|
def __log_latest(self, v):
|
|
l = len(self.types) - 1
|
|
logg.debug('Encoder added {} -> {} ({})'.format(v, self.contents[l], self.types[l].value))
|
|
|
|
|
|
def uint256(self, v):
|
|
v = int(v)
|
|
b = v.to_bytes(32, 'big')
|
|
self.contents.append(b.hex())
|
|
self.types.append(ABIContractType.UINT256)
|
|
self.__log_latest(v)
|
|
|
|
|
|
def bool(self, v):
|
|
return self.boolean(v)
|
|
|
|
|
|
def boolean(self, v):
|
|
if bool(v):
|
|
return self.uint256(1)
|
|
return self.uint256(0)
|
|
|
|
|
|
def address(self, v):
|
|
self.bytes_fixed(32, v, 20)
|
|
self.types.append(ABIContractType.ADDRESS)
|
|
self.__log_latest(v)
|
|
|
|
|
|
def bytes32(self, v):
|
|
self.bytes_fixed(32, v)
|
|
self.types.append(ABIContractType.BYTES32)
|
|
self.__log_latest(v)
|
|
|
|
|
|
def bytes4(self, v):
|
|
self.bytes_fixed(4, v)
|
|
self.types.append(ABIContractType.BYTES4)
|
|
self.__log_latest(v)
|
|
|
|
|
|
|
|
def string(self, v):
|
|
b = v.encode('utf-8')
|
|
l = len(b)
|
|
contents = l.to_bytes(32, 'big')
|
|
contents += b
|
|
padlen = 32 - (l % 32)
|
|
contents += padlen * b'\x00'
|
|
self.bytes_fixed(len(contents), contents)
|
|
self.types.append(ABIContractType.STRING)
|
|
self.__log_latest(v)
|
|
return contents
|
|
|
|
|
|
def bytes_fixed(self, mx, v, exact=0):
|
|
typ = type(v).__name__
|
|
if typ == 'str':
|
|
v = strip_0x(v)
|
|
l = len(v)
|
|
if exact > 0 and l != exact * 2:
|
|
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
|
|
if l > mx * 2:
|
|
raise ValueError('value too long ({})'.format(l))
|
|
v = pad(v, mx)
|
|
elif typ == 'bytes':
|
|
l = len(v)
|
|
if exact > 0 and l != exact:
|
|
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
|
|
b = bytearray(mx)
|
|
b[mx-l:] = v
|
|
v = pad(b.hex(), mx)
|
|
else:
|
|
raise ValueError('invalid input {}'.format(typ))
|
|
self.contents.append(v.ljust(64, '0'))
|
|
|
|
|
|
|
|
def get_method_signature(self):
|
|
s = self.get_method()
|
|
if s == '':
|
|
return s
|
|
return keccak256_string_to_hex(s)[:8]
|
|
|
|
|
|
def get_contents(self):
|
|
direct_contents = ''
|
|
pointer_contents = ''
|
|
l = len(self.types)
|
|
pointer_cursor = 32 * l
|
|
for i in range(l):
|
|
if self.types[i] in dynamic_contract_types:
|
|
content_length = len(self.contents[i])
|
|
pointer_contents += self.contents[i]
|
|
direct_contents += pointer_cursor.to_bytes(32, 'big').hex()
|
|
pointer_cursor += int(content_length / 2)
|
|
else:
|
|
direct_contents += self.contents[i]
|
|
s = ''.join(direct_contents + pointer_contents)
|
|
for i in range(0, len(s), 64):
|
|
l = len(s) - i
|
|
if l > 64:
|
|
l = 64
|
|
logg.debug('code word {} {}'.format(int(i / 64), s[i:i+64]))
|
|
return s
|
|
|
|
|
|
def get(self):
|
|
return self.encode()
|
|
|
|
|
|
def encode(self):
|
|
m = self.get_method_signature()
|
|
c = self.get_contents()
|
|
return m + c
|
|
|
|
|
|
def __str__(self):
|
|
return self.encode()
|
|
|
|
|
|
|
|
def abi_decode_single(typ, v):
|
|
d = ABIContractDecoder()
|
|
d.typ(typ)
|
|
d.val(v)
|
|
r = d.decode()
|
|
return r[0]
|
|
|
|
|
|
def code(address, block_spec=BlockSpec.LATEST, id_generator=None):
|
|
block_height = None
|
|
if block_spec == BlockSpec.LATEST:
|
|
block_height = 'latest'
|
|
elif block_spec == BlockSpec.PENDING:
|
|
block_height = 'pending'
|
|
else:
|
|
block_height = int(block_spec)
|
|
j = JSONRPCRequest(id_generator)
|
|
o = j.template()
|
|
o['method'] = 'eth_getCode'
|
|
o['params'].append(address)
|
|
o['params'].append(block_height)
|
|
return j.finalize(o)
|