diff --git a/apps/cic-eth/cic_eth/ext/tx.py b/apps/cic-eth/cic_eth/ext/tx.py index f291f335..1dba80eb 100644 --- a/apps/cic-eth/cic_eth/ext/tx.py +++ b/apps/cic-eth/cic_eth/ext/tx.py @@ -3,21 +3,28 @@ import logging import math # third-pary imports -import web3 import celery import moolb -from cic_registry.chain import ChainSpec +from chainlib.chain import ChainSpec +from chainlib.connection import RPCConnection +from chainlib.eth.tx import ( + unpack, + transaction_by_block, + receipt, + ) +from chainlib.eth.block import block_by_number +from chainlib.eth.contract import abi_decode_single +from chainlib.eth.erc20 import ERC20 from hexathon import strip_0x +from cic_eth_registry import CICRegistry +from cic_eth_registry.erc20 import ERC20Token # local imports -from cic_eth.registry import safe_registry -from cic_eth.eth.rpc import RpcClient from cic_eth.db.models.otx import Otx -from cic_eth.eth.util import unpack_signed_raw_tx from cic_eth.db.enum import StatusEnum -from cic_eth.eth.token import unpack_transfer from cic_eth.queue.tx import get_tx_cache from cic_eth.queue.time import tx_times +from cic_eth.task import BaseTask celery_app = celery.current_app logg = logging.getLogger() @@ -26,8 +33,8 @@ MAX_BLOCK_TX = 250 # TODO: Make this method easier to read -@celery_app.task() -def list_tx_by_bloom(bloomspec, address, chain_str): +@celery_app.task(bind=True, base=BaseTask) +def list_tx_by_bloom(self, bloomspec, address, chain_spec_dict): """Retrieve external transaction data matching the provided filter The bloom filter representation with the following structure (the size of the filter will be inferred from the size of the provided filter data): @@ -49,9 +56,11 @@ def list_tx_by_bloom(bloomspec, address, chain_str): :returns: dict of transaction data as dict, keyed by transaction hash :rtype: dict of dict """ - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) - registry = safe_registry(c.w3) + chain_spec = ChainSpec.from_dict(chain_spec_dict) + chain_str = str(chain_spec) + rpc = RPCConnection.connect(chain_spec, 'default') + registry = CICRegistry(chain_spec, rpc) + block_filter_data = bytes.fromhex(bloomspec['block_filter']) tx_filter_data = bytes.fromhex(bloomspec['blocktx_filter']) databitlen = len(block_filter_data)*8 @@ -63,47 +72,53 @@ def list_tx_by_bloom(bloomspec, address, chain_str): block_height_bytes = block_height.to_bytes(4, 'big') if block_filter.check(block_height_bytes): logg.debug('filter matched block {}'.format(block_height)) - block = c.w3.eth.getBlock(block_height, True) + o = block_by_number(block_height) + block = rpc.do(o) + logg.debug('block {}'.format(block)) - for tx_index in range(0, len(block.transactions)): + for tx_index in range(0, len(block['transactions'])): composite = tx_index + block_height tx_index_bytes = composite.to_bytes(4, 'big') if tx_filter.check(tx_index_bytes): logg.debug('filter matched block {} tx {}'.format(block_height, tx_index)) try: - tx = c.w3.eth.getTransactionByBlock(block_height, tx_index) - except web3.exceptions.TransactionNotFound: - logg.debug('false positive on block {} tx {}'.format(block_height, tx_index)) + #tx = c.w3.eth.getTransactionByBlock(block_height, tx_index) + o = transaction_by_block(block['hash'], tx_index) + tx = rpc.do(o) + except Exception as e: + logg.debug('false positive on block {} tx {} ({})'.format(block_height, tx_index, e)) continue tx_address = None tx_token_value = 0 try: - transfer_data = unpack_transfer(tx['data']) - tx_address = transfer_data['to'] - tx_token_value = transfer_data['amount'] + transfer_data = ERC20.parse_transfer_request(tx['data']) + tx_address = transfer_data[0] + tx_token_value = transfer_data[1] except ValueError: logg.debug('not a transfer transaction, skipping {}'.format(tx)) continue if address == tx_address: status = StatusEnum.SENT try: - rcpt = c.w3.eth.getTransactionReceipt(tx.hash) + o = receipt(tx['hash']) + rcpt = rpc.do(o) if rcpt['status'] == 0: pending = StatusEnum.REVERTED else: pending = StatusEnum.SUCCESS - except web3.exceptions.TransactionNotFound: + except Exception as e: + logg.error('skipping receipt lookup for {}: {}'.format(tx['hash'], e)) pass - tx_hash_hex = tx['hash'].hex() - - token = registry.get_address(chain_spec, tx['to']) - token_symbol = token.symbol() - token_decimals = token.decimals() - times = tx_times(tx_hash_hex, chain_str) + # TODO: pass through registry to validate declarator entry of token + #token = registry.by_address(tx['to'], sender_address=self.call_address) + token = ERC20Token(rpc, tx['to']) + token_symbol = token.symbol + token_decimals = token.decimals + times = tx_times(tx['hash'], chain_spec) tx_r = { - 'hash': tx_hash_hex, + 'hash': tx['hash'], 'sender': tx['from'], 'recipient': tx_address, 'source_value': tx_token_value, @@ -122,7 +137,7 @@ def list_tx_by_bloom(bloomspec, address, chain_str): tx_r['date_created'] = times['queue'] else: tx_r['date_created'] = times['network'] - txs[tx_hash_hex] = tx_r + txs[tx['hash']] = tx_r break return txs @@ -131,7 +146,7 @@ def list_tx_by_bloom(bloomspec, address, chain_str): # TODO: DRY this with callback filter in cic_eth/runnable/manager # TODO: Remove redundant fields from end representation (timestamp, tx_hash) @celery_app.task() -def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True): +def tx_collate(tx_batches, chain_spec_dict, offset, limit, newest_first=True): """Merges transaction data from multiple sources and sorts them in chronological order. :param tx_batches: Transaction data inputs @@ -148,7 +163,7 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True): :rtype: list """ txs_by_block = {} - chain_spec = ChainSpec.from_chain_str(chain_str) + chain_spec = ChainSpec.from_dict(chain_spec_dict) if isinstance(tx_batches, dict): tx_batches = [tx_batches] @@ -159,7 +174,7 @@ def tx_collate(tx_batches, chain_str, offset, limit, newest_first=True): k = None try: hx = strip_0x(v) - tx = unpack_signed_raw_tx(bytes.fromhex(hx), chain_spec.chain_id()) + tx = unpack(bytes.fromhex(hx), chain_spec.chain_id()) txc = get_tx_cache(tx['hash']) txc['timestamp'] = int(txc['date_created'].timestamp()) txc['hash'] = txc['tx_hash'] diff --git a/apps/cic-eth/cic_eth/queue/time.py b/apps/cic-eth/cic_eth/queue/time.py index c35d03bb..f85e55c0 100644 --- a/apps/cic-eth/cic_eth/queue/time.py +++ b/apps/cic-eth/cic_eth/queue/time.py @@ -2,12 +2,13 @@ import logging # third-party imports -import web3 import celery -from cic_registry.chain import ChainSpec +from chainlib.chain import ChainSpec +from chainlib.connection import RPCConnection +from chainlib.eth.block import block_by_hash +from chainlib.eth.tx import receipt # local imports -from cic_eth.eth.rpc import RpcClient from cic_eth.db.models.otx import Otx from cic_eth.error import NotLocalTxError from cic_eth.task import CriticalSQLAlchemyAndWeb3Task @@ -17,21 +18,21 @@ celery_app = celery.current_app logg = logging.getLogger() -# TODO: This method does not belong in the _queue_ module, it operates across queue and network -@celery_app.task(base=CriticalSQLAlchemyAndWeb3Task) -def tx_times(tx_hash, chain_str): - chain_spec = ChainSpec.from_chain_str(chain_str) - c = RpcClient(chain_spec) +def tx_times(tx_hash, chain_spec): + rpc = RPCConnection.connect(chain_spec, 'default') time_pair = { 'network': None, 'queue': None, } try: - rcpt = c.w3.eth.getTransactionReceipt(tx_hash) - block = c.w3.eth.getBlock(rcpt['blockHash']) + o = receipt(tx_hash) + r = rpc.do(o) + o = block_by_hash(r['block_hash']) + block = rpc.do(o) logg.debug('rcpt {}'.format(block)) time_pair['network'] = block['timestamp'] - except web3.exceptions.TransactionNotFound: + except Exception as e: + logg.debug('error with getting timestamp details for {}: {}'.format(tx_hash, e)) pass otx = Otx.load(tx_hash) diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index 257c3f30..f7ac5320 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -15,7 +15,7 @@ def celery_includes(): # 'cic_eth.eth.bancor', 'cic_eth.eth.erc20', 'cic_eth.eth.tx', -# 'cic_eth.ext.tx', + 'cic_eth.ext.tx', 'cic_eth.queue.tx', 'cic_eth.queue.balance', 'cic_eth.admin.ctrl', diff --git a/apps/cic-eth/tests/fixtures_role.py b/apps/cic-eth/tests/fixtures_role.py index 9ad12659..1ededfc1 100644 --- a/apps/cic-eth/tests/fixtures_role.py +++ b/apps/cic-eth/tests/fixtures_role.py @@ -8,6 +8,7 @@ from chainlib.eth.address import to_checksum_address # local imports from cic_eth.db.models.role import AccountRole +from cic_eth.db.models.nonce import Nonce #logg = logging.getLogger(__name__) # what the actual fuck, debug is not being shown even though explicitly set @@ -16,14 +17,31 @@ logg = logging.getLogger() @pytest.fixture(scope='function') -def custodial_roles( +def init_custodial( contract_roles, token_roles, + agent_roles, + init_database, + ): + for roles in [contract_roles, token_roles, agent_roles]: + for role in roles.values(): + Nonce.init(role, session=init_database) + + init_database.commit() + + +@pytest.fixture(scope='function') +def custodial_roles( + init_custodial, + contract_roles, + token_roles, + agent_roles, eth_accounts, init_database, ): r = {} r.update(contract_roles) + r.update(agent_roles) r.update({ 'GAS_GIFTER': eth_accounts[10], 'FOO_TOKEN_GIFTER': token_roles['FOO_TOKEN_OWNER'], diff --git a/apps/cic-eth/tests/task/api/test_list.py b/apps/cic-eth/tests/task/api/test_list.py new file mode 100644 index 00000000..22c27a37 --- /dev/null +++ b/apps/cic-eth/tests/task/api/test_list.py @@ -0,0 +1,120 @@ +# standard imports +import logging + +# local imports +from chainlib.eth.nonce import RPCNonceOracle +from chainlib.eth.erc20 import ERC20 +from chainlib.eth.tx import receipt +from cic_eth.api.api_task import Api +from tests.mock.filter import ( + block_filter, + tx_filter, + ) +from cic_eth.db.models.nonce import ( + Nonce, + NonceReservation, + ) + +logg = logging.getLogger() + + +def test_list_tx( + default_chain_spec, + init_database, + cic_registry, + eth_rpc, + eth_signer, + custodial_roles, + agent_roles, + foo_token, + register_tokens, + init_eth_tester, + celery_worker, + ): + + chain_id = default_chain_spec.chain_id() + + tx_hashes = [] + + # external tx + nonce_oracle = RPCNonceOracle(custodial_roles['FOO_TOKEN_GIFTER'], eth_rpc) + nonce = nonce_oracle.get_nonce() + + q = init_database.query(Nonce) + q = q.filter(Nonce.address_hex==agent_roles['ALICE']) + o = q.first() + o.nonce = nonce + init_database.add(o) + init_database.commit() + + # TODO: implement cachenonceoracle instead, this is useless + # external tx one + Nonce.next(custodial_roles['FOO_TOKEN_GIFTER'], 'foo', session=init_database) + init_database.commit() + + init_eth_tester.mine_blocks(13) + c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) + (tx_hash_hex, o) = c.transfer(foo_token, custodial_roles['FOO_TOKEN_GIFTER'], agent_roles['ALICE'], 1024) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + assert r['status'] == 1 + a = r['block_number'] + block_filter.add(a.to_bytes(4, 'big')) + + a = r['block_number'] + r['transaction_index'] + tx_filter.add(a.to_bytes(4, 'big')) + + tx_hashes.append(tx_hash_hex) + + # external tx two + Nonce.next(agent_roles['ALICE'], 'foo', session=init_database) + init_database.commit() + + init_eth_tester.mine_blocks(13) + nonce_oracle = RPCNonceOracle(agent_roles['ALICE'], eth_rpc) + c = ERC20(signer=eth_signer, nonce_oracle=nonce_oracle, chain_id=chain_id) + (tx_hash_hex, o) = c.transfer(foo_token, agent_roles['ALICE'], agent_roles['BOB'], 256) + eth_rpc.do(o) + o = receipt(tx_hash_hex) + r = eth_rpc.do(o) + assert r['status'] == 1 + a = r['block_number'] + block_filter.add(a.to_bytes(4, 'big')) + + a = r['block_number'] + r['transaction_index'] + tx_filter.add(a.to_bytes(4, 'big')) + + tx_hashes.append(tx_hash_hex) + + init_eth_tester.mine_blocks(28) + + # custodial tx 1 + api = Api(str(default_chain_spec), queue=None) + t = api.transfer(agent_roles['ALICE'], agent_roles['CAROL'], 64, 'FOO') #, 'blinky') + r = t.get_leaf() + assert t.successful() + tx_hashes.append(r) + + # custodial tx 2 + api = Api(str(default_chain_spec), queue=None) + t = api.transfer(agent_roles['ALICE'], agent_roles['DAVE'], 16, 'FOO') #, 'blinky') + r = t.get_leaf() + assert t.successful() + tx_hashes.append(r) + + logg.debug('r {}'.format(r)) + + # test the api + t = api.list(agent_roles['ALICE'], external_task='tests.mock.filter.filter') + r = t.get_leaf() + assert t.successful() + + + assert len(r) == 3 + logg.debug('rrrr {}'.format(r)) + + for tx in r: + logg.debug('have tx {}'.format(tx)) + tx_hashes.remove(tx['hash']) + assert len(tx_hashes) == 1