Rehabilitate transaction list api and test

This commit is contained in:
nolash 2021-03-21 21:10:56 +01:00
parent 267ce84caa
commit 0ab8675d19
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
5 changed files with 199 additions and 45 deletions

View File

@ -3,21 +3,28 @@ import logging
import math
# third-pary imports
import web3
import celery
import moolb
from cic_registry.chain import ChainSpec
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.tx import (
unpack,
transaction_by_block,
receipt,
)
from chainlib.eth.block import block_by_number
from chainlib.eth.contract import abi_decode_single
from chainlib.eth.erc20 import ERC20
from hexathon import strip_0x
from cic_eth_registry import CICRegistry
from cic_eth_registry.erc20 import ERC20Token
# local imports
from cic_eth.registry import safe_registry
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.eth.util import unpack_signed_raw_tx
from cic_eth.db.enum import StatusEnum
from cic_eth.eth.token import unpack_transfer
from cic_eth.queue.tx import get_tx_cache
from cic_eth.queue.time import tx_times
from cic_eth.task import BaseTask
celery_app = celery.current_app
logg = logging.getLogger()
@ -26,8 +33,8 @@ MAX_BLOCK_TX = 250
# TODO: Make this method easier to read
@celery_app.task()
def list_tx_by_bloom(bloomspec, address, chain_str):
@celery_app.task(bind=True, base=BaseTask)
def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict):
"""Retrieve external transaction data matching the provided filter
The bloom filter representation with the following structure (the size of the filter will be inferred from the size of the provided filter data):
@ -49,9 +56,11 @@ def list_tx_by_bloom(bloomspec, address, chain_str):
:returns: dict of transaction data as dict, keyed by transaction hash
:rtype: dict of dict
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
registry = safe_registry(c.w3)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
chain_str = str(chain_spec)
rpc = RPCConnection.connect(chain_spec, 'default')
registry = CICRegistry(chain_spec, rpc)
block_filter_data = bytes.fromhex(bloomspec['block_filter'])
tx_filter_data = bytes.fromhex(bloomspec['blocktx_filter'])
databitlen = len(block_filter_data)*8
@ -63,47 +72,53 @@ def list_tx_by_bloom(bloomspec, address, chain_str):
block_height_bytes = block_height.to_bytes(4, 'big')
if block_filter.check(block_height_bytes):
logg.debug('filter matched block {}'.format(block_height))
block = c.w3.eth.getBlock(block_height, True)
o = block_by_number(block_height)
block = rpc.do(o)
logg.debug('block {}'.format(block))
for tx_index in range(0, len(block.transactions)):
for tx_index in range(0, len(block['transactions'])):
composite = tx_index + block_height
tx_index_bytes = composite.to_bytes(4, 'big')
if tx_filter.check(tx_index_bytes):
logg.debug('filter matched block {} tx {}'.format(block_height, tx_index))
try:
tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
except web3.exceptions.TransactionNotFound:
logg.debug('false positive on block {} tx {}'.format(block_height, tx_index))
#tx = c.w3.eth.getTransactionByBlock(block_height, tx_index)
o = transaction_by_block(block['hash'], tx_index)
tx = rpc.do(o)
except Exception as e:
logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e))
continue
tx_address = None
tx_token_value = 0
try:
transfer_data = unpack_transfer(tx['data'])
tx_address = transfer_data['to']
tx_token_value = transfer_data['amount']
transfer_data = ERC20.parse_transfer_request(tx['data'])
tx_address = transfer_data[0]
tx_token_value = transfer_data[1]
except ValueError:
logg.debug('not a transfer transaction, skipping {}'.format(tx))
continue
if address == tx_address:
status = StatusEnum.SENT
try:
rcpt = c.w3.eth.getTransactionReceipt(tx.hash)
o = receipt(tx['hash'])
rcpt = rpc.do(o)
if rcpt['status'] == 0:
pending = StatusEnum.REVERTED
else:
pending = StatusEnum.SUCCESS
except web3.exceptions.TransactionNotFound:
except Exception as e:
logg.error('skipping receipt lookup for {}: {}'.format(tx['hash'], e))
pass
tx_hash_hex = tx['hash'].hex()
token = registry.get_address(chain_spec, tx['to'])
token_symbol = token.symbol()
token_decimals = token.decimals()
times = tx_times(tx_hash_hex, chain_str)
# TODO: pass through registry to validate declarator entry of token
#token = registry.by_address(tx['to'], sender_address=self.call_address)
token = ERC20Token(rpc, tx['to'])
token_symbol = token.symbol
token_decimals = token.decimals
times = tx_times(tx['hash'], chain_spec)
tx_r = {
'hash': tx_hash_hex,
'hash': tx['hash'],
'sender': tx['from'],
'recipient': tx_address,
'source_value': tx_token_value,
@ -122,7 +137,7 @@ def list_tx_by_bloom(bloomspec, address, chain_str):
tx_r['date_created'] = times['queue']
else:
tx_r['date_created'] = times['network']
txs[tx_hash_hex] = tx_r
txs[tx['hash']] = tx_r
break
return txs
@ -131,7 +146,7 @@ def list_tx_by_bloom(bloomspec, address, chain_str):
# TODO: DRY this with callback filter in cic_eth/runnable/manager
# TODO: Remove redundant fields from end representation (timestamp, tx_hash)
@celery_app.task()
def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True):
"""Merges transaction data from multiple sources and sorts them in chronological order.
:param tx_batches: Transaction data inputs
@ -148,7 +163,7 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
:rtype: list
"""
txs_by_block = {}
chain_spec = ChainSpec.from_chain_str(chain_str)
chain_spec = ChainSpec.from_dict(chain_spec_dict)
if isinstance(tx_batches, dict):
tx_batches = [tx_batches]
@ -159,7 +174,7 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True):
k = None
try:
hx = strip_0x(v)
tx = unpack_signed_raw_tx(bytes.fromhex(hx), chain_spec.chain_id())
tx = unpack(bytes.fromhex(hx), chain_spec.chain_id())
txc = get_tx_cache(tx['hash'])
txc['timestamp'] = int(txc['date_created'].timestamp())
txc['hash'] = txc['tx_hash']

View File

@ -2,12 +2,13 @@
import logging
# third-party imports
import web3
import celery
from cic_registry.chain import ChainSpec
from chainlib.chain import ChainSpec
from chainlib.connection import RPCConnection
from chainlib.eth.block import block_by_hash
from chainlib.eth.tx import receipt
# local imports
from cic_eth.eth.rpc import RpcClient
from cic_eth.db.models.otx import Otx
from cic_eth.error import NotLocalTxError
from cic_eth.task import CriticalSQLAlchemyAndWeb3Task
@ -17,21 +18,21 @@ celery_app = celery.current_app
logg = logging.getLogger()
# TODO: This method does not belong in the _queue_ module, it operates across queue and network
@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task)
def tx_times(tx_hash, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str)
c = RpcClient(chain_spec)
def tx_times(tx_hash, chain_spec):
rpc = RPCConnection.connect(chain_spec, 'default')
time_pair = {
'network': None,
'queue': None,
}
try:
rcpt = c.w3.eth.getTransactionReceipt(tx_hash)
block = c.w3.eth.getBlock(rcpt['blockHash'])
o = receipt(tx_hash)
r = rpc.do(o)
o = block_by_hash(r['block_hash'])
block = rpc.do(o)
logg.debug('rcpt {}'.format(block))
time_pair['network'] = block['timestamp']
except web3.exceptions.TransactionNotFound:
except Exception as e:
logg.debug('error with getting timestamp details for {}: {}'.format(tx_hash, e))
pass
otx = Otx.load(tx_hash)

View File

@ -15,7 +15,7 @@ def celery_includes():
# 'cic_eth.eth.bancor',
'cic_eth.eth.erc20',
'cic_eth.eth.tx',
# 'cic_eth.ext.tx',
'cic_eth.ext.tx',
'cic_eth.queue.tx',
'cic_eth.queue.balance',
'cic_eth.admin.ctrl',

View File

@ -8,6 +8,7 @@ from chainlib.eth.address import to_checksum_address
# local imports
from cic_eth.db.models.role import AccountRole
from cic_eth.db.models.nonce import Nonce
#logg = logging.getLogger(__name__)
# what the actual fuck, debug is not being shown even though explicitly set
@ -16,14 +17,31 @@ logg = logging.getLogger()
@pytest.fixture(scope='function')
def custodial_roles(
def init_custodial(
contract_roles,
token_roles,
agent_roles,
init_database,
):
for roles in [contract_roles, token_roles, agent_roles]:
for role in roles.values():
Nonce.init(role, session=init_database)
init_database.commit()
@pytest.fixture(scope='function')
def custodial_roles(
init_custodial,
contract_roles,
token_roles,
agent_roles,
eth_accounts,
init_database,
):
r = {}
r.update(contract_roles)
r.update(agent_roles)
r.update({
'GAS_GIFTER': eth_accounts[10],
'FOO_TOKEN_GIFTER': token_roles['FOO_TOKEN_OWNER'],

View File

@ -0,0 +1,120 @@
# standard imports
import logging
# local imports
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.erc20 import ERC20
from chainlib.eth.tx import receipt
from cic_eth.api.api_task import Api
from tests.mock.filter import (
block_filter,
tx_filter,
)
from cic_eth.db.models.nonce import (
Nonce,
NonceReservation,
)
logg = logging.getLogger()
def test_list_tx(
default_chain_spec,
init_database,
cic_registry,
eth_rpc,
eth_signer,
custodial_roles,
agent_roles,
foo_token,
register_tokens,
init_eth_tester,
celery_worker,
):
chain_id = default_chain_spec.chain_id()
tx_hashes = []
# external tx
nonce_oracle = RPCNonceOracle(custodial_roles['FOO_TOKEN_GIFTER'], eth_rpc)
nonce = nonce_oracle.get_nonce()
q = init_database.query(Nonce)
q = q.filter(Nonce.address_hex==agent_roles['ALICE'])
o = q.first()
o.nonce = nonce
init_database.add(o)
init_database.commit()
# TODO: implement cachenonceoracle instead, this is useless
# external tx one
Nonce.next(custodial_roles['FOO_TOKEN_GIFTER'], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
(tx_hash_hex, o) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
a = r['block_number']
block_filter.add(a.to_bytes(4, 'big'))
a = r['block_number'] + r['transaction_index']
tx_filter.add(a.to_bytes(4, 'big'))
tx_hashes.append(tx_hash_hex)
# external tx two
Nonce.next(agent_roles['ALICE'], 'foo', session=init_database)
init_database.commit()
init_eth_tester.mine_blocks(13)
nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc)
c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id)
(tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 256)
eth_rpc.do(o)
o = receipt(tx_hash_hex)
r = eth_rpc.do(o)
assert r['status'] == 1
a = r['block_number']
block_filter.add(a.to_bytes(4, 'big'))
a = r['block_number'] + r['transaction_index']
tx_filter.add(a.to_bytes(4, 'big'))
tx_hashes.append(tx_hash_hex)
init_eth_tester.mine_blocks(28)
# custodial tx 1
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(agent_roles['ALICE'], agent_roles['CAROL'], 64, 'FOO') #, 'blinky')
r = t.get_leaf()
assert t.successful()
tx_hashes.append(r)
# custodial tx 2
api = Api(str(default_chain_spec), queue=None)
t = api.transfer(agent_roles['ALICE'], agent_roles['DAVE'], 16, 'FOO') #, 'blinky')
r = t.get_leaf()
assert t.successful()
tx_hashes.append(r)
logg.debug('r {}'.format(r))
# test the api
t = api.list(agent_roles['ALICE'], external_task='tests.mock.filter.filter')
r = t.get_leaf()
assert t.successful()
assert len(r) == 3
logg.debug('rrrr {}'.format(r))
for tx in r:
logg.debug('have tx {}'.format(tx))
tx_hashes.remove(tx['hash'])
assert len(tx_hashes) == 1