cic-internal-integration/apps/cic-eth/cic_eth/eth/token.py
2021-03-01 20:15:17 +00:00

526 lines
17 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import logging
# third-party imports
import celery
import requests
import web3
from cic_registry import CICRegistry
from cic_registry import zero_address
from cic_registry.chain import ChainSpec
from hexathon import strip_0x
from chainlib.status import Status as TxStatus
# platform imports
from cic_eth.db.models.tx import TxCache
from cic_eth.db.models.base import SessionBase
from cic_eth.eth import RpcClient
from cic_eth.error import TokenCountError, PermanentTxError, OutOfGasError, NotLocalTxError
from cic_eth.eth.task import sign_and_register_tx
from cic_eth.eth.task import create_check_gas_and_send_task
from cic_eth.eth.factory import TxFactory
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.ext.address import translate_address
from cic_eth.task import (
CriticalSQLAlchemyTask,
CriticalWeb3Task,
)
celery_app = celery.current_app
logg = logging.getLogger()
# TODO: fetch from cic-contracts instead when implemented
contract_function_signatures = {
'transfer': 'a9059cbb',
'approve': '095ea7b3',
'transferfrom': '23b872dd',
}
class TokenTxFactory(TxFactory):
"""Factory for creating ERC20 token transactions.
"""
def approve(
self,
token_address,
spender_address,
amount,
chain_spec,
):
"""Create an ERC20 "approve" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param spender_address: Address to approve spending for
:type spender_address: str, 0x-hex
:param amount: Amount of tokens to approve
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "approve" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
tx_approve_buildable = source_token_contract.functions.approve(
spender_address,
amount,
)
source_token_gas = source_token.gas('transfer')
tx_approve = tx_approve_buildable.buildTransaction({
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_approve
def transfer(
self,
token_address,
receiver_address,
value,
chain_spec,
):
"""Create an ERC20 "transfer" transaction
:param token_address: ERC20 contract address
:type token_address: str, 0x-hex
:param receiver_address: Address to send tokens to
:type receiver_address: str, 0x-hex
:param amount: Amount of tokens to send
:type amount: int
:param chain_spec: Chain spec
:type chain_spec: cic_registry.chain.ChainSpec
:returns: Unsigned "transfer" transaction in standard Ethereum format
:rtype: dict
"""
source_token = CICRegistry.get_address(chain_spec, token_address)
source_token_contract = source_token.contract
transfer_buildable = source_token_contract.functions.transfer(
receiver_address,
value,
)
source_token_gas = source_token.gas('transfer')
tx_transfer = transfer_buildable.buildTransaction(
{
'from': self.address,
'gas': source_token_gas,
'gasPrice': self.gas_price,
'chainId': chain_spec.chain_id(),
'nonce': self.next_nonce(),
})
return tx_transfer
def unpack_transfer(data):
"""Verifies that a transaction is an "ERC20.transfer" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transfer']:
raise ValueError('Invalid transfer data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
def unpack_transferfrom(data):
"""Verifies that a transaction is an "ERC20.transferFrom" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['transferfrom']:
raise ValueError('Invalid transferFrom data ({})'.format(f))
d = data[8:]
return {
'from': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'to': web3.Web3.toChecksumAddress('0x' + d[128-40:128]),
'amount': int(d[128:], 16)
}
def unpack_approve(data):
"""Verifies that a transaction is an "ERC20.approve" transaction, and extracts call parameters from it.
:param data: Raw input data from Ethereum transaction.
:type data: str, 0x-hex
:raises ValueError: Function signature does not match AccountRegister.add
:returns: Parsed parameters
:rtype: dict
"""
data = strip_0x(data)
f = data[:8]
if f != contract_function_signatures['approve']:
raise ValueError('Invalid approval data ({})'.format(f))
d = data[8:]
return {
'to': web3.Web3.toChecksumAddress('0x' + d[64-40:64]),
'amount': int(d[64:], 16)
}
@celery_app.task(base=CriticalWeb3Task)
def balance(tokens, holder_address, chain_str):
"""Return token balances for a list of tokens for given address
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:return: List of balances
:rtype: list of int
"""
#abi = ContractRegistry.abi('ERC20Token')
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
for t in tokens:
#token = CICRegistry.get_address(t['address'])
#abi = token.abi()
#o = c.w3.eth.contract(abi=abi, address=t['address'])
o = CICRegistry.get_address(chain_spec, t['address']).contract
b = o.functions.balanceOf(holder_address).call()
t['balance_network'] = b
return tokens
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def transfer(self, tokens, holder_address, receiver_address, value, chain_str):
"""Transfer ERC20 tokens between addresses
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.transfer(t['address'], receiver_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_transfer')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def approve(self, tokens, holder_address, spender_address, value, chain_str):
"""Approve ERC20 transfer on behalf of holder address
First argument is a list of tokens, to enable the task to be chained to the symbol to token address resolver function. However, it accepts only one token as argument.
:raises TokenCountError: Either none or more then one tokens have been passed as tokens argument
:param tokens: Token addresses
:type tokens: list of str, 0x-hex
:param holder_address: Token holder address
:type holder_address: str, 0x-hex
:param receiver_address: Token receiver address
:type receiver_address: str, 0x-hex
:param value: Amount of token, in 'wei'
:type value: int
:param chain_str: Chain spec string representation
:type chain_str: str
:raises TokenCountError: More than one token is passed in tokens list
:return: Transaction hash for tranfer operation
:rtype: str, 0x-hex
"""
# we only allow one token, one transfer
if len(tokens) != 1:
raise TokenCountError
chain_spec = ChainSpec.from_chain_str(chain_str)
queue = self.request.delivery_info['routing_key']
# retrieve the token interface
t = tokens[0]
c = RpcClient(chain_spec, holder_address=holder_address)
txf = TokenTxFactory(holder_address, c)
tx_transfer = txf.approve(t['address'], spender_address, value, chain_spec)
(tx_hash_hex, tx_signed_raw_hex) = sign_and_register_tx(tx_transfer, chain_str, queue, cache_task='cic_eth.eth.token.otx_cache_approve')
gas_budget = tx_transfer['gas'] * tx_transfer['gasPrice']
s = create_check_gas_and_send_task(
[tx_signed_raw_hex],
chain_str,
holder_address,
gas_budget,
[tx_hash_hex],
queue,
)
s.apply_async()
return tx_hash_hex
@celery_app.task(base=CriticalWeb3Task)
def resolve_tokens_by_symbol(token_symbols, chain_str):
"""Returns contract addresses of an array of ERC20 token symbols
:param token_symbols: Token symbols to resolve
:type token_symbols: list of str
:param chain_str: Chain spec string representation
:type chain_str: str
:return: Respective token contract addresses
:rtype: list of str, 0x-hex
"""
tokens = []
chain_spec = ChainSpec.from_chain_str(chain_str)
for token_symbol in token_symbols:
token = CICRegistry.get_token(chain_spec, token_symbol)
tokens.append({
'address': token.address(),
'converters': [],
})
return tokens
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_transfer(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.transfer or ERC20.transferFrom transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_transfer_data(tx_hash_hex, tx)
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_transfer_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_transfer
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_transfer(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
@celery_app.task(base=CriticalSQLAlchemyTask)
def otx_cache_approve(
tx_hash_hex,
tx_signed_raw_hex,
chain_str,
):
"""Generates and commits transaction cache metadata for an ERC20.approve transaction
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx_signed_raw_hex: Raw signed transaction
:type tx_signed_raw_hex: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_signed_raw_bytes = bytes.fromhex(tx_signed_raw_hex[2:])
tx = unpack_signed_raw_tx(tx_signed_raw_bytes, chain_spec.chain_id())
(txc, cache_id) = cache_approve_data(tx_hash_hex, tx)
return txc
@celery_app.task(base=CriticalSQLAlchemyTask)
def cache_approve_data(
tx_hash_hex,
tx,
):
"""Helper function for otx_cache_approve
:param tx_hash_hex: Transaction hash
:type tx_hash_hex: str, 0x-hex
:param tx: Signed raw transaction
:type tx: str, 0x-hex
:returns: Transaction hash and id of cache element in storage backend, respectively
:rtype: tuple
"""
tx_data = unpack_approve(tx['data'])
logg.debug('tx data {}'.format(tx_data))
logg.debug('tx {}'.format(tx))
session = SessionBase.create_session()
tx_cache = TxCache(
tx_hash_hex,
tx['from'],
tx_data['to'],
tx['to'],
tx['to'],
tx_data['amount'],
tx_data['amount'],
session=session,
)
session.add(tx_cache)
session.commit()
cache_id = tx_cache.id
session.close()
return (tx_hash_hex, cache_id)
class ExtendedTx:
_default_decimals = 6
def __init__(self, tx_hash, chain_spec):
self._chain_spec = chain_spec
self.chain = str(chain_spec)
self.hash = tx_hash
self.sender = None
self.sender_label = None
self.recipient = None
self.recipient_label = None
self.source_token_value = 0
self.destination_token_value = 0
self.source_token = zero_address
self.destination_token = zero_address
self.source_token_symbol = ''
self.destination_token_symbol = ''
self.source_token_decimals = ExtendedTx._default_decimals
self.destination_token_decimals = ExtendedTx._default_decimals
self.status = TxStatus.PENDING.name
self.status_code = TxStatus.PENDING.value
def set_actors(self, sender, recipient, trusted_declarator_addresses=None):
self.sender = sender
self.recipient = recipient
if trusted_declarator_addresses != None:
self.sender_label = translate_address(sender, trusted_declarator_addresses, self.chain)
self.recipient_label = translate_address(recipient, trusted_declarator_addresses, self.chain)
def set_tokens(self, source, source_value, destination=None, destination_value=None):
if destination == None:
destination = source
if destination_value == None:
destination_value = source_value
st = CICRegistry.get_address(self._chain_spec, source)
dt = CICRegistry.get_address(self._chain_spec, destination)
self.source_token = source
self.source_token_symbol = st.symbol()
self.source_token_decimals = st.decimals()
self.source_token_value = source_value
self.destination_token = destination
self.destination_token_symbol = dt.symbol()
self.destination_token_decimals = dt.decimals()
self.destination_token_value = destination_value
def set_status(self, n):
if n:
self.status = TxStatus.ERROR.name
else:
self.status = TxStatus.SUCCESS.name
self.status_code = n
def to_dict(self):
o = {}
for attr in dir(self):
if attr[0] == '_' or attr in ['set_actors', 'set_tokens', 'set_status', 'to_dict']:
continue
o[attr] = getattr(self, attr)
return o