chainlib-eth/chainlib/eth/contract.py
2021-08-21 07:27:40 +00:00

511 lines
14 KiB
Python

# standard imports
import enum
import re
import logging
# external imports
from hexathon import (
strip_0x,
pad,
)
# local imports
from chainlib.hash import keccak256_string_to_hex
from chainlib.block import BlockSpec
from chainlib.jsonrpc import JSONRPCRequest
from .address import to_checksum_address
logg = logging.getLogger(__name__)
re_method = r'^[a-zA-Z0-9_]+$'
class ABIContractType(enum.Enum):
"""Data types used by ABI encoders
"""
BYTES32 = 'bytes32'
BYTES4 = 'bytes4'
UINT256 = 'uint256'
ADDRESS = 'address'
STRING = 'string'
BOOLEAN = 'bool'
dynamic_contract_types = [
ABIContractType.STRING,
]
class ABIContract:
"""Base class for Ethereum smart contract encoder
"""
def __init__(self):
self.types = []
self.contents = []
class ABIMethodEncoder(ABIContract):
"""Generate ABI method signatures from method signature string.
"""
def __init__(self):
super(ABIMethodEncoder, self).__init__()
self.method_name = None
self.method_contents = []
def method(self, m):
"""Set method name.
:param m: Method name
:type m: str
:raises ValueError: Invalid method name
"""
if re.match(re_method, m) == None:
raise ValueError('Invalid method {}, must match regular expression {}'.format(re_method))
self.method_name = m
self.__log_method()
def get_method(self):
"""Return currently set method signature string.
:rtype: str
:returns: Method signature
"""
if self.method_name == None:
return ''
return '{}({})'.format(self.method_name, ','.join(self.method_contents))
def typ(self, v):
"""Add argument type to argument vector.
Method name must be set before this is called.
:param v: Type to add
:type v: chainlib.eth.contract.ABIContractType
:raises AttributeError: Type set before method name
:raises TypeError: Invalid type
"""
if self.method_name == None:
raise AttributeError('method name must be set before adding types')
if not isinstance(v, ABIContractType):
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
self.method_contents.append(v.value)
self.__log_method()
def __log_method(self):
logg.debug('method set to {}'.format(self.get_method()))
class ABIContractDecoder(ABIContract):
"""Decode serialized ABI contract input data to corresponding python primitives.
"""
def typ(self, v):
"""Add type to argument array to parse input against.
:param v: Type
:type v: chainlib.eth.contract.ABIContractType
:raises TypeError: Invalid type
"""
if not isinstance(v, ABIContractType):
raise TypeError('method type not valid; expected {}, got {}'.format(type(ABIContractType).__name__, type(v).__name__))
self.types.append(v.value)
self.__log_typ()
def val(self, v):
"""Add value to value array.
:param v: Value, in hex
:type v: str
"""
self.contents.append(v)
logg.debug('content is now {}'.format(self.contents))
def uint256(self, v):
"""Parse value as uint256.
:param v: Value, in hex
:type v: str
:rtype: int
:returns: Int value
"""
return int(v, 16)
def bytes32(self, v):
"""Parse value as bytes32.
:param v: Value, in hex
:type v: str
:rtype: str
:returns: Value, in hex
"""
return v
def bool(self, v):
"""Parse value as bool.
:param v: Value, in hex
:type v: str
:rtype: bool
:returns: Value
"""
return bool(self.uint256(v))
def boolean(self, v):
"""Alias of chainlib.eth.contract.ABIContractDecoder.bool
"""
return bool(self.uint256(v))
def address(self, v):
"""Parse value as address.
:param v: Value, in hex
:type v: str
:rtype: str
:returns: Value. in hex
"""
a = strip_0x(v)[64-40:]
return to_checksum_address(a)
def string(self, v):
"""Parse value as string.
:param v: Value, in hex
:type v: str
:rtype: str
:returns: Value
"""
s = strip_0x(v)
b = bytes.fromhex(s)
cursor = 0
offset = int.from_bytes(b[cursor:cursor+32], 'big')
cursor += 32
length = int.from_bytes(b[cursor:cursor+32], 'big')
cursor += 32
content = b[cursor:cursor+length]
logg.debug('parsing string offset {} length {} content {}'.format(offset, length, content))
return content.decode('utf-8')
def __log_typ(self):
logg.debug('types set to ({})'.format(','.join(self.types)))
def decode(self):
"""Apply decoder on value array using argument type array.
:rtype: list
:returns: List of decoded values
"""
r = []
logg.debug('contents {}'.format(self.contents))
for i in range(len(self.types)):
m = getattr(self, self.types[i])
s = self.contents[i]
r.append(m(s))
return r
def get(self):
"""Alias of chainlib.eth.contract.ABIContractDecoder.decode
"""
return self.decode()
def __str__(self):
return self.decode()
class ABIContractLogDecoder(ABIMethodEncoder, ABIContractDecoder):
"""Decoder utils for log entries of an Ethereum network transaction receipt.
Uses chainlib.eth.contract.ABIContractDecoder.decode to render output from template.
"""
def __init__(self):
super(ABIContractLogDecoder, self).__init__()
self.method_name = None
self.indexed_content = []
def topic(self, event):
"""Set topic to match.
:param event: Topic name
:type event: str
"""
self.method(event)
def get_method_signature(self):
"""Generate topic signature from set topic.
:rtype: str
:returns: Topic signature, in hex
"""
s = self.get_method()
return keccak256_string_to_hex(s)
def typ(self, v):
"""Add type to event argument array.
:param v: Type
:type v: chainlib.eth.contract.ABIContractType
"""
super(ABIContractLogDecoder, self).typ(v)
self.types.append(v.value)
def apply(self, topics, data):
"""Set log entry data to parse.
After set, self.decode can be used to render the output.
:param topics: The topics array of the receipt, list of hex
:type topics: list
:param data: Non-indexed data, in hex
:type data: str
:raises ValueError: Topic of input does not match topic set in parser
"""
t = self.get_method_signature()
if topics[0] != t:
raise ValueError('topic mismatch')
for i in range(len(topics) - 1):
self.contents.append(topics[i+1])
self.contents += data
class ABIContractEncoder(ABIMethodEncoder):
def __log_latest(self, v):
l = len(self.types) - 1
logg.debug('Encoder added {} -> {} ({})'.format(v, self.contents[l], self.types[l].value))
def uint256(self, v):
"""Encode value to uint256 and add to input value vector.
:param v: Integer value
:type v: int
"""
v = int(v)
b = v.to_bytes(32, 'big')
self.contents.append(b.hex())
self.types.append(ABIContractType.UINT256)
self.__log_latest(v)
def bool(self, v):
"""Alias of chainlib.eth.contract.ABIContractEncoder.boolean.
"""
return self.boolean(v)
def boolean(self, v):
"""Encode value to boolean and add to input value vector.
:param v: Trueish or falsish value
:type v: any
:rtype: See chainlib.eth.contract.ABIContractEncoder.uint256
:returns: See chainlib.eth.contract.ABIContractEncoder.uint256
"""
if bool(v):
return self.uint256(1)
return self.uint256(0)
def address(self, v):
"""Encode value to address and add to input value vector.
:param v: Ethereum address, in hex
:type v: str
"""
self.bytes_fixed(32, v, 20)
self.types.append(ABIContractType.ADDRESS)
self.__log_latest(v)
def bytes32(self, v):
"""Encode value to bytes32 and add to input value vector.
:param v: Bytes, in hex
:type v: str
"""
self.bytes_fixed(32, v)
self.types.append(ABIContractType.BYTES32)
self.__log_latest(v)
def bytes4(self, v):
"""Encode value to bytes4 and add to input value vector.
:param v: Bytes, in hex
:type v: str
"""
self.bytes_fixed(4, v)
self.types.append(ABIContractType.BYTES4)
self.__log_latest(v)
def string(self, v):
"""Encode value to string and add to input value vector.
:param v: String input
:type v: str
"""
b = v.encode('utf-8')
l = len(b)
contents = l.to_bytes(32, 'big')
contents += b
padlen = 32 - (l % 32)
contents += padlen * b'\x00'
self.bytes_fixed(len(contents), contents)
self.types.append(ABIContractType.STRING)
self.__log_latest(v)
return contents
def bytes_fixed(self, mx, v, exact=0):
"""Add arbirary length byte data to value vector.
:param mx: Max length of input data.
:type mx: int
:param v: Byte input, hex or bytes
:type v: str | bytes
:param exact: Fail parsing if input does not translate to given byte length.
:type exact: int
:raises ValueError: Input length or input format mismatch.
"""
typ = type(v).__name__
if typ == 'str':
v = strip_0x(v)
l = len(v)
if exact > 0 and l != exact * 2:
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
if l > mx * 2:
raise ValueError('value too long ({})'.format(l))
v = pad(v, mx)
elif typ == 'bytes':
l = len(v)
if exact > 0 and l != exact:
raise ValueError('value wrong size; expected {}, got {})'.format(mx, l))
b = bytearray(mx)
b[mx-l:] = v
v = pad(b.hex(), mx)
else:
raise ValueError('invalid input {}'.format(typ))
self.contents.append(v.ljust(64, '0'))
def get_method_signature(self):
"""Return abi encoded signature of currently set method.
"""
s = self.get_method()
if s == '':
return s
return keccak256_string_to_hex(s)[:8]
def get_contents(self):
"""Encode value array.
:rtype: str
:returns: ABI encoded values, in hex
"""
direct_contents = ''
pointer_contents = ''
l = len(self.types)
pointer_cursor = 32 * l
for i in range(l):
if self.types[i] in dynamic_contract_types:
content_length = len(self.contents[i])
pointer_contents += self.contents[i]
direct_contents += pointer_cursor.to_bytes(32, 'big').hex()
pointer_cursor += int(content_length / 2)
else:
direct_contents += self.contents[i]
s = ''.join(direct_contents + pointer_contents)
for i in range(0, len(s), 64):
l = len(s) - i
if l > 64:
l = 64
logg.debug('code word {} {}'.format(int(i / 64), s[i:i+64]))
return s
def get(self):
"""Alias of chainlib.eth.contract.ABIContractEncoder.encode
"""
return self.encode()
def encode(self):
"""Encode method and value array.
The data generated by this method is the literal data used as input to contract calls or transactions.
:rtype: str
:returns: ABI encoded contract input data, in hex
"""
m = self.get_method_signature()
c = self.get_contents()
return m + c
def __str__(self):
return self.encode()
def abi_decode_single(typ, v):
"""Convenience function to decode a single ABI encoded value against a given type.
:param typ: Type to parse value as
:type typ: chainlib.eth.contract.ABIContractEncoder
:param v: Value to parse, in hex
:type v: str
"""
d = ABIContractDecoder()
d.typ(typ)
d.val(v)
r = d.decode()
return r[0]
def code(address, block_spec=BlockSpec.LATEST, id_generator=None):
"""Generate json-rpc query to retrieve code stored at an Ethereum address.
:param address: Address to use for query, in hex
:type address: str
:param block_spec: Block height spec
:type block_spec: chainlib.block.BlockSpec
:param id_generator: json-rpc id generator
:type id_generator: chainlib.jsonrpc.JSONRPCIdGenerator
:rtype: dict
:returns: rpc query object
"""
block_height = None
if block_spec == BlockSpec.LATEST:
block_height = 'latest'
elif block_spec == BlockSpec.PENDING:
block_height = 'pending'
else:
block_height = int(block_spec)
j = JSONRPCRequest(id_generator)
o = j.template()
o['method'] = 'eth_getCode'
o['params'].append(address)
o['params'].append(block_height)
return j.finalize(o)