2021-02-17 10:33:18 +01:00
# standard imports
import logging
import math
# third-pary imports
import celery
import moolb
2021-03-29 15:27:53 +02:00
from chainlib . chain import ChainSpec
from chainlib . connection import RPCConnection
from chainlib . eth . tx import (
unpack ,
transaction_by_block ,
receipt ,
)
2021-07-25 15:07:28 +02:00
from chainlib . eth . error import RequestMismatchException
2021-03-29 15:27:53 +02:00
from chainlib . eth . block import block_by_number
from chainlib . eth . contract import abi_decode_single
2021-07-14 17:06:20 +02:00
from chainlib . eth . constant import ZERO_ADDRESS
2021-07-25 17:27:19 +02:00
from chainlib . eth . tx import Tx
2021-02-17 10:33:18 +01:00
from hexathon import strip_0x
2021-03-29 15:27:53 +02:00
from cic_eth_registry import CICRegistry
from cic_eth_registry . erc20 import ERC20Token
2021-07-14 17:06:20 +02:00
from cic_eth_registry . error import UnknownContractError
2021-04-04 14:40:59 +02:00
from chainqueue . db . models . otx import Otx
from chainqueue . db . enum import StatusEnum
2021-06-03 15:51:55 +02:00
from chainqueue . sql . query import get_tx_cache
2021-05-02 19:11:03 +02:00
from eth_erc20 import ERC20
2021-07-25 15:07:28 +02:00
from erc20_faucet import Faucet
2021-02-17 10:33:18 +01:00
# local imports
from cic_eth . queue . time import tx_times
2021-03-29 15:27:53 +02:00
from cic_eth . task import BaseTask
2021-04-04 14:40:59 +02:00
from cic_eth . db . models . base import SessionBase
2021-02-17 10:33:18 +01:00
celery_app = celery . current_app
logg = logging . getLogger ( )
MAX_BLOCK_TX = 250
2021-07-25 15:07:28 +02:00
def parse_transaction ( chain_spec , rpc , tx , sender_address = None ) :
try :
transfer_data = ERC20 . parse_transfer_request ( tx [ ' input ' ] )
tx_address = transfer_data [ 0 ]
tx_token_value = transfer_data [ 1 ]
2021-07-25 17:27:19 +02:00
logg . debug ( ' matched transfer transaction {} in block {} sender {} recipient {} value {} ' . format ( tx [ ' hash ' ] , tx [ ' block_number ' ] , tx [ ' from ' ] , tx_address , tx_token_value ) )
2021-07-25 15:07:28 +02:00
return ( tx_address , tx_token_value )
except RequestMismatchException :
pass
try :
transfer_data = Faucet . parse_give_to_request ( tx [ ' input ' ] )
tx_address = transfer_data [ 0 ]
c = Faucet ( chain_spec )
2021-07-25 17:27:19 +02:00
o = c . token_amount ( tx [ ' to ' ] , sender_address = sender_address , height = tx [ ' block_number ' ] )
2021-07-25 15:07:28 +02:00
r = rpc . do ( o )
tx_token_value = Faucet . parse_token_amount ( r )
2021-07-25 17:27:19 +02:00
logg . debug ( ' matched giveto transaction {} in block {} sender {} recipient {} value {} ' . format ( tx [ ' hash ' ] , tx [ ' block_number ' ] , tx [ ' from ' ] , tx_address , tx_token_value ) )
2021-07-25 15:07:28 +02:00
return ( tx_address , tx_token_value )
except RequestMismatchException :
pass
return None
2021-02-17 10:33:18 +01:00
# TODO: Make this method easier to read
2021-03-29 15:27:53 +02:00
@celery_app.task ( bind = True , base = BaseTask )
def list_tx_by_bloom ( self , bloomspec , address , chain_spec_dict ) :
2021-02-17 10:33:18 +01:00
""" Retrieve external transaction data matching the provided filter
The bloom filter representation with the following structure ( the size of the filter will be inferred from the size of the provided filter data ) :
{
' alg ' : < str ; hashing algorithm , currently only " sha256 " is understood > ,
' high ' : < number ; highest block number in matched set > ,
' low ' : < number ; lowest block number in matched set > ,
' filter_rounds ' : < number ; hashing rounds used to generate filter entry > ,
' block_filter ' : < hex ; bloom filter data with block matches > ,
' blocktx_filter ' : < hex ; bloom filter data with block + tx matches > ,
}
: param bloomspec : Bloom filter data
: type bloomspec : dict ( see description above )
: param address : Recipient address to use in matching
: type address : str , 0 x - hex
: param chain_str : Chain spec string representation
: type chain_str : str
: returns : dict of transaction data as dict , keyed by transaction hash
: rtype : dict of dict
"""
2021-03-29 15:27:53 +02:00
chain_spec = ChainSpec . from_dict ( chain_spec_dict )
chain_str = str ( chain_spec )
rpc = RPCConnection . connect ( chain_spec , ' default ' )
registry = CICRegistry ( chain_spec , rpc )
2021-02-17 10:33:18 +01:00
block_filter_data = bytes . fromhex ( bloomspec [ ' block_filter ' ] )
tx_filter_data = bytes . fromhex ( bloomspec [ ' blocktx_filter ' ] )
databitlen = len ( block_filter_data ) * 8
block_filter = moolb . Bloom ( databitlen , bloomspec [ ' filter_rounds ' ] , default_data = block_filter_data )
tx_filter = moolb . Bloom ( databitlen , bloomspec [ ' filter_rounds ' ] , default_data = tx_filter_data )
txs = { }
2021-07-25 15:07:28 +02:00
logg . debug ( ' processing filter with span low {} to high {} ' . format ( bloomspec [ ' low ' ] , bloomspec [ ' high ' ] ) )
2021-02-17 10:33:18 +01:00
for block_height in range ( bloomspec [ ' low ' ] , bloomspec [ ' high ' ] ) :
block_height_bytes = block_height . to_bytes ( 4 , ' big ' )
if block_filter . check ( block_height_bytes ) :
logg . debug ( ' filter matched block {} ' . format ( block_height ) )
2021-03-29 15:27:53 +02:00
o = block_by_number ( block_height )
block = rpc . do ( o )
logg . debug ( ' block {} ' . format ( block ) )
2021-02-17 10:33:18 +01:00
2021-03-29 15:27:53 +02:00
for tx_index in range ( 0 , len ( block [ ' transactions ' ] ) ) :
2021-07-25 15:07:28 +02:00
tx_index_bytes = tx_index . to_bytes ( 4 , ' big ' )
composite = block_height_bytes + tx_index_bytes
if tx_filter . check ( composite ) :
2021-02-17 10:33:18 +01:00
logg . debug ( ' filter matched block {} tx {} ' . format ( block_height , tx_index ) )
try :
2021-03-29 15:27:53 +02:00
#tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
o = transaction_by_block ( block [ ' hash ' ] , tx_index )
tx = rpc . do ( o )
except Exception as e :
logg . debug ( ' false positive on block {} tx {} ( {} ) ' . format ( block_height , tx_index , e ) )
2021-07-25 17:27:19 +02:00
continue
tx = Tx ( tx ) . src ( )
2021-07-25 15:07:28 +02:00
logg . debug ( ' got tx {} ' . format ( tx ) )
2021-02-17 10:33:18 +01:00
tx_address = None
tx_token_value = 0
2021-07-25 15:07:28 +02:00
transfer_data = parse_transaction ( chain_spec , rpc , tx , sender_address = BaseTask . call_address )
if transfer_data == None :
2021-02-17 10:33:18 +01:00
continue
2021-07-25 15:07:28 +02:00
tx_address = transfer_data [ 0 ]
tx_token_value = transfer_data [ 1 ]
2021-02-17 10:33:18 +01:00
if address == tx_address :
status = StatusEnum . SENT
try :
2021-03-29 15:27:53 +02:00
o = receipt ( tx [ ' hash ' ] )
rcpt = rpc . do ( o )
2021-02-17 10:33:18 +01:00
if rcpt [ ' status ' ] == 0 :
pending = StatusEnum . REVERTED
else :
pending = StatusEnum . SUCCESS
2021-03-29 15:27:53 +02:00
except Exception as e :
logg . error ( ' skipping receipt lookup for {} : {} ' . format ( tx [ ' hash ' ] , e ) )
2021-02-17 10:33:18 +01:00
pass
2021-03-29 15:27:53 +02:00
# TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address)
times = tx_times ( tx [ ' hash ' ] , chain_spec )
2021-02-17 10:33:18 +01:00
tx_r = {
2021-03-29 15:27:53 +02:00
' hash ' : tx [ ' hash ' ] ,
2021-02-17 10:33:18 +01:00
' sender ' : tx [ ' from ' ] ,
' recipient ' : tx_address ,
' source_value ' : tx_token_value ,
' destination_value ' : tx_token_value ,
' source_token ' : tx [ ' to ' ] ,
' destination_token ' : tx [ ' to ' ] ,
' nonce ' : tx [ ' nonce ' ] ,
}
if times [ ' queue ' ] != None :
tx_r [ ' date_created ' ] = times [ ' queue ' ]
else :
tx_r [ ' date_created ' ] = times [ ' network ' ]
2021-03-29 15:27:53 +02:00
txs [ tx [ ' hash ' ] ] = tx_r
2021-02-17 10:33:18 +01:00
break
return txs
2021-07-25 15:07:28 +02:00
2021-02-17 10:33:18 +01:00
# TODO: Surely it must be possible to optimize this
# TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
2021-07-14 17:06:20 +02:00
@celery_app.task ( bind = True , base = BaseTask )
def tx_collate ( self , tx_batches , chain_spec_dict , offset , limit , newest_first = True , verify_contracts = True ) :
2021-02-17 10:33:18 +01:00
""" Merges transaction data from multiple sources and sorts them in chronological order.
: param tx_batches : Transaction data inputs
: type tx_batches : lists of lists of transaction data
: param chain_str : Chain spec string representation
: type chain_str : str
: param offset : Number of sorted results to skip ( not yet implemented )
: type offset : number
: param limit : Maximum number of results to return ( not yet implemented )
: type limit : number
: param newest_first : If True , returns results in reverse chronological order
: type newest_first : bool
: returns : Transactions
: rtype : list
"""
txs_by_block = { }
2021-03-29 15:27:53 +02:00
chain_spec = ChainSpec . from_dict ( chain_spec_dict )
2021-02-17 10:33:18 +01:00
2021-03-04 17:47:13 +01:00
if isinstance ( tx_batches , dict ) :
tx_batches = [ tx_batches ]
2021-04-04 14:40:59 +02:00
session = SessionBase . create_session ( )
2021-02-17 10:33:18 +01:00
for b in tx_batches :
for v in b . values ( ) :
tx = None
k = None
try :
hx = strip_0x ( v )
2021-04-04 14:40:59 +02:00
tx = unpack ( bytes . fromhex ( hx ) , chain_spec )
txc = get_tx_cache ( chain_spec , tx [ ' hash ' ] , session )
2021-02-17 10:33:18 +01:00
txc [ ' timestamp ' ] = int ( txc [ ' date_created ' ] . timestamp ( ) )
txc [ ' hash ' ] = txc [ ' tx_hash ' ]
tx = txc
except TypeError :
tx = v
tx [ ' timestamp ' ] = tx [ ' date_created ' ]
k = ' {} . {} . {} ' . format ( tx [ ' timestamp ' ] , tx [ ' sender ' ] , tx [ ' nonce ' ] )
txs_by_block [ k ] = tx
2021-04-04 14:40:59 +02:00
session . close ( )
2021-02-17 10:33:18 +01:00
txs = [ ]
ks = list ( txs_by_block . keys ( ) )
ks . sort ( )
if newest_first :
ks . reverse ( )
for k in ks :
2021-07-14 17:06:20 +02:00
tx = txs_by_block [ k ]
if verify_contracts :
try :
tx = verify_and_expand ( tx , chain_spec , sender_address = BaseTask . call_address )
except UnknownContractError :
logg . error ( ' verify failed on tx {} , skipping ' . format ( tx [ ' hash ' ] ) )
continue
txs . append ( tx )
2021-04-04 14:40:59 +02:00
2021-02-17 10:33:18 +01:00
return txs
2021-07-14 17:06:20 +02:00
def verify_and_expand ( tx , chain_spec , sender_address = ZERO_ADDRESS ) :
rpc = RPCConnection . connect ( chain_spec , ' default ' )
registry = CICRegistry ( chain_spec , rpc )
if tx . get ( ' source_token_symbol ' ) == None and tx [ ' source_token ' ] != ZERO_ADDRESS :
r = registry . by_address ( tx [ ' source_token ' ] , sender_address = sender_address )
token = ERC20Token ( chain_spec , rpc , tx [ ' source_token ' ] )
tx [ ' source_token_symbol ' ] = token . symbol
tx [ ' source_token_decimals ' ] = token . decimals
if tx . get ( ' destination_token_symbol ' ) == None and tx [ ' destination_token ' ] != ZERO_ADDRESS :
r = registry . by_address ( tx [ ' destination_token ' ] , sender_address = sender_address )
token = ERC20Token ( chain_spec , rpc , tx [ ' destination_token ' ] )
tx [ ' destination_token_symbol ' ] = token . symbol
tx [ ' destination_token_decimals ' ] = token . decimals
return tx