diff --git a/apps/cic-eth/cic_eth/api/api_task.py b/apps/cic-eth/cic_eth/api/api_task.py index 6764b1c3..a92bd501 100644 --- a/apps/cic-eth/cic_eth/api/api_task.py +++ b/apps/cic-eth/cic_eth/api/api_task.py @@ -30,7 +30,7 @@ class Api: :param queue: Name of worker queue to submit tasks to :type queue: str """ - def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop', callback_queue=None): + def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None): self.chain_str = chain_str self.chain_spec = ChainSpec.from_chain_str(chain_str) self.callback_param = callback_param @@ -301,13 +301,15 @@ class Api: return t - def balance(self, address, token_symbol): + def balance(self, address, token_symbol, include_pending=True): """Calls the provided callback with the current token balance of the given address. :param address: Ethereum address of holder :type address: str, 0x-hex :param token_symbol: ERC20 token symbol of token to send :type token_symbol: str + :param include_pending: If set, will include transactions that have not yet been fully processed + :type include_pending: bool :returns: uuid of root task :rtype: celery.Task """ @@ -330,14 +332,45 @@ class Api: ], queue=self.queue, ) - - if self.callback_param != None: - s_balance.link(self.callback_success) - s_tokens.link(s_balance).on_error(self.callback_error) - else: - s_tokens.link(s_balance) + s_result = celery.signature( + 'cic_eth.queue.balance.assemble_balances', + [], + queue=self.queue, + ) - t = s_tokens.apply_async(queue=self.queue) + last_in_chain = s_balance + if include_pending: + s_balance_incoming = celery.signature( + 'cic_eth.queue.balance.balance_incoming', + [ + address, + self.chain_str, + ], + queue=self.queue, + ) + s_balance_outgoing = celery.signature( + 'cic_eth.queue.balance.balance_outgoing', + [ + address, + self.chain_str, + ], + queue=self.queue, + ) + s_balance.link(s_balance_incoming) + s_balance_incoming.link(s_balance_outgoing) + last_in_chain = s_balance_outgoing + + one = celery.chain(s_tokens, s_balance) + two = celery.chain(s_tokens, s_balance_incoming) + three = celery.chain(s_tokens, s_balance_outgoing) + + t = None + if self.callback_param != None: + s_result.link(self.callback_success).on_error(self.callback_error) + t = celery.chord([one, two, three])(s_result) + else: + t = celery.chord([one, two, three])(s_result) + return t diff --git a/apps/cic-eth/cic_eth/callbacks/noop.py b/apps/cic-eth/cic_eth/callbacks/noop.py index b064e40d..2fd579e3 100644 --- a/apps/cic-eth/cic_eth/callbacks/noop.py +++ b/apps/cic-eth/cic_eth/callbacks/noop.py @@ -18,4 +18,4 @@ def noop(self, result, param, status_code): :rtype: bool """ logg.info('noop callback {} {} {}'.format(result, param, status_code)) - return True + return result diff --git a/apps/cic-eth/cic_eth/db/enum.py b/apps/cic-eth/cic_eth/db/enum.py index de20d31a..9b77ba18 100644 --- a/apps/cic-eth/cic_eth/db/enum.py +++ b/apps/cic-eth/cic_eth/db/enum.py @@ -4,20 +4,23 @@ import enum @enum.unique class StatusBits(enum.IntEnum): - QUEUED = 0x01 - IN_NETWORK = 0x08 + """Individual bit flags that are combined to define the state and legacy of a queued transaction + + """ + QUEUED = 0x01 # transaction should be sent to network + IN_NETWORK = 0x08 # transaction is in network - DEFERRED = 0x10 - GAS_ISSUES = 0x20 + DEFERRED = 0x10 # an attempt to send the transaction to network has failed + GAS_ISSUES = 0x20 # transaction is pending sender account gas funding - LOCAL_ERROR = 0x100 - NODE_ERROR = 0x200 - NETWORK_ERROR = 0x400 - UNKNOWN_ERROR = 0x800 + LOCAL_ERROR = 0x100 # errors that originate internally from the component + NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...) + NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT) + UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur) - FINAL = 0x1000 - OBSOLETE = 0x2000 - MANUAL = 0x8000 + FINAL = 0x1000 # transaction processing has completed + OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee + MANUAL = 0x8000 # transaction processing has been manually overridden @enum.unique @@ -79,6 +82,19 @@ class LockEnum(enum.IntEnum): def status_str(v, bits_only=False): + """Render a human-readable string describing the status + + If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned. + + If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only. + + :param v: Status bit field + :type v: number + :param bits_only: Only render individual bit labels. + :type bits_only: bool + :returns: Status string + :rtype: str + """ s = '' if not bits_only: try: @@ -100,14 +116,39 @@ def status_str(v, bits_only=False): def all_errors(): + """Bit mask of all error states + + :returns: Error flags + :rtype: number + """ return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR def is_error_status(v): + """Check if value is an error state + + :param v: Status bit field + :type v: number + :returns: True if error + :rtype: bool + """ return bool(v & all_errors()) +def dead(): + """Bit mask defining whether a transaction is still likely to be processed on the network. + + :returns: Bit mask + :rtype: number + """ + return StatusBits.FINAL | StatusBits.OBSOLETE + + def is_alive(v): - return bool(v & (StatusBits.FINAL | StatusBits.OBSOLETE) == 0) + """Check if transaction is still likely to be processed on the network. + The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set). + :returns: + """ + return bool(v & dead() == 0) diff --git a/apps/cic-eth/cic_eth/db/models/otx.py b/apps/cic-eth/cic_eth/db/models/otx.py index c22895be..0816d846 100644 --- a/apps/cic-eth/cic_eth/db/models/otx.py +++ b/apps/cic-eth/cic_eth/db/models/otx.py @@ -287,7 +287,6 @@ class Otx(SessionBase): self.__set_status(StatusBits.IN_NETWORK, session) self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session) - logg.debug('<<< status {}'.format(status_str(self.status))) if self.tracing: self.__state_log(session=session) diff --git a/apps/cic-eth/cic_eth/eth/token.py b/apps/cic-eth/cic_eth/eth/token.py index 5fd77c12..60ffec9c 100644 --- a/apps/cic-eth/cic_eth/eth/token.py +++ b/apps/cic-eth/cic_eth/eth/token.py @@ -187,7 +187,6 @@ def balance(tokens, holder_address, chain_str): """ #abi = ContractRegistry.abi('ERC20Token') chain_spec = ChainSpec.from_chain_str(chain_str) - balances = [] c = RpcClient(chain_spec) for t in tokens: #token = CICRegistry.get_address(t['address']) @@ -195,9 +194,9 @@ def balance(tokens, holder_address, chain_str): #o = c.w3.eth.contract(abi=abi, address=t['address']) o = CICRegistry.get_address(chain_spec, t['address']).contract b = o.functions.balanceOf(holder_address).call() - logg.debug('balance {} for {}: {}'.format(t['address'], holder_address, b)) - balances.append(b) - return b + t['balance_network'] = b + + return tokens @celery_app.task(bind=True) @@ -326,7 +325,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str): token = CICRegistry.get_token(chain_spec, token_symbol) tokens.append({ 'address': token.address(), - #'converters': [], + 'converters': [], }) return tokens diff --git a/apps/cic-eth/cic_eth/eth/tx.py b/apps/cic-eth/cic_eth/eth/tx.py index 562af162..c4a3fca6 100644 --- a/apps/cic-eth/cic_eth/eth/tx.py +++ b/apps/cic-eth/cic_eth/eth/tx.py @@ -344,7 +344,6 @@ def send(self, txs, chain_str): chain_spec = ChainSpec.from_chain_str(chain_str) - tx_hex = txs[0] logg.debug('send transaction {}'.format(tx_hex)) diff --git a/apps/cic-eth/cic_eth/queue/balance.py b/apps/cic-eth/cic_eth/queue/balance.py new file mode 100644 index 00000000..5c035167 --- /dev/null +++ b/apps/cic-eth/cic_eth/queue/balance.py @@ -0,0 +1,120 @@ +# standard imports +import logging + +# third-party imports +import celery +from hexathon import strip_0x + +# local imports +from cic_registry.chain import ChainSpec +from cic_eth.db import SessionBase +from cic_eth.db.models.otx import Otx +from cic_eth.db.models.tx import TxCache +from cic_eth.db.enum import ( + StatusBits, + dead, + ) + +celery_app = celery.current_app + +logg = logging.getLogger() + + +def __balance_outgoing_compatible(token_address, holder_address, chain_str): + session = SessionBase.create_session() + q = session.query(TxCache.from_value) + q = q.join(Otx) + q = q.filter(TxCache.sender==holder_address) + status_compare = dead() + q = q.filter(Otx.status.op('&')(status_compare)==0) + q = q.filter(TxCache.source_token_address==token_address) + delta = 0 + for r in q.all(): + delta += int(r[0]) + session.close() + return delta + + +@celery_app.task() +def balance_outgoing(tokens, holder_address, chain_str): + """Retrieve accumulated value of unprocessed transactions sent from the given address. + + :param tokens: list of token spec dicts with addresses to retrieve balances for + :type tokens: list of str, 0x-hex + :param holder_address: Sender address + :type holder_address: str, 0x-hex + :param chain_str: Chain spec string representation + :type chain_str: str + :returns: Tokens dicts with outgoing balance added + :rtype: dict + """ + chain_spec = ChainSpec.from_chain_str(chain_str) + for t in tokens: + b = __balance_outgoing_compatible(t['address'], holder_address, chain_str) + t['balance_outgoing'] = b + + return tokens + + +def __balance_incoming_compatible(token_address, receiver_address, chain_str): + session = SessionBase.create_session() + q = session.query(TxCache.to_value) + q = q.join(Otx) + q = q.filter(TxCache.recipient==receiver_address) + status_compare = dead() + q = q.filter(Otx.status.op('&')(status_compare)==0) + # TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed. + q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK) + q = q.filter(TxCache.destination_token_address==token_address) + delta = 0 + for r in q.all(): + delta += int(r[0]) + session.close() + return delta + + +@celery_app.task() +def balance_incoming(tokens, receipient_address, chain_str): + """Retrieve accumulated value of unprocessed transactions to be received by the given address. + + :param tokens: list of token spec dicts with addresses to retrieve balances for + :type tokens: list of str, 0x-hex + :param holder_address: Recipient address + :type holder_address: str, 0x-hex + :param chain_str: Chain spec string representation + :type chain_str: str + :returns: Tokens dicts with outgoing balance added + :rtype: dict + """ + chain_spec = ChainSpec.from_chain_str(chain_str) + for t in tokens: + b = __balance_incoming_compatible(t['address'], receipient_address, chain_str) + t['balance_incoming'] = b + + return tokens + + +@celery_app.task() +def assemble_balances(balances_collection): + """Combines token spec dicts with individual balances into a single token spec dict. + + A "balance" means any field that is keyed with a string starting with "balance_" + + :param balances_collection: Token spec dicts + :type balances_collection: list of lists of dicts + :returns: Single token spec dict per token with all balances + :rtype: list of dicts + """ + tokens = {} + for c in balances_collection: + for b in c: + address = b['address'] + if tokens.get(address) == None: + tokens[address] = { + 'address': address, + 'converters': b['converters'], + } + for k in b.keys(): + if k[:8] == 'balance_': + tokens[address][k] = b[k] + return list(tokens.values()) diff --git a/apps/cic-eth/cic_eth/queue/tx.py b/apps/cic-eth/cic_eth/queue/tx.py index 2159ee86..94ab14be 100644 --- a/apps/cic-eth/cic_eth/queue/tx.py +++ b/apps/cic-eth/cic_eth/queue/tx.py @@ -5,6 +5,7 @@ import datetime # third-party imports import celery +from hexathon import strip_0x from sqlalchemy import or_ from sqlalchemy import not_ from sqlalchemy import tuple_ @@ -12,6 +13,7 @@ from sqlalchemy import func # local imports from cic_registry import CICRegistry +from cic_registry.chain import ChainSpec from cic_eth.db.models.otx import Otx from cic_eth.db.models.otx import OtxStateLog from cic_eth.db.models.tx import TxCache @@ -22,6 +24,7 @@ from cic_eth.db.enum import ( LockEnum, StatusBits, is_alive, + dead, ) from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx from cic_eth.error import NotLocalTxError @@ -687,5 +690,3 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None) return txs - - diff --git a/apps/cic-eth/tests/fixtures_celery.py b/apps/cic-eth/tests/fixtures_celery.py index 8d8a31bb..5d4f712d 100644 --- a/apps/cic-eth/tests/fixtures_celery.py +++ b/apps/cic-eth/tests/fixtures_celery.py @@ -17,6 +17,7 @@ def celery_includes(): 'cic_eth.eth.tx', 'cic_eth.ext.tx', 'cic_eth.queue.tx', + 'cic_eth.queue.balance', 'cic_eth.admin.ctrl', 'cic_eth.admin.nonce', 'cic_eth.eth.account', diff --git a/apps/cic-eth/tests/functional/test_app.py b/apps/cic-eth/tests/functional/test_app.py index 29426ea5..ff9a65ce 100644 --- a/apps/cic-eth/tests/functional/test_app.py +++ b/apps/cic-eth/tests/functional/test_app.py @@ -29,27 +29,6 @@ def test_account_api( assert t.successful() -def test_balance_api( - default_chain_spec, - default_chain_registry, - init_w3, - cic_registry, - init_database, - bancor_tokens, - bancor_registry, - celery_session_worker, - ): - - token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0]) - - api = Api(str(default_chain_spec), callback_param='balance', callback_task='cic_eth.callbacks.noop.noop', queue=None) - t = api.balance(init_w3.eth.accounts[2], token.symbol()) - t.get() - for r in t.collect(): - print(r) - assert t.successful() - - def test_transfer_api( default_chain_spec, init_w3, diff --git a/apps/cic-eth/tests/functional/test_balance.py b/apps/cic-eth/tests/functional/test_balance.py new file mode 100644 index 00000000..243c7423 --- /dev/null +++ b/apps/cic-eth/tests/functional/test_balance.py @@ -0,0 +1,40 @@ +# standard imports +import os +import logging + +# local imports +import web3 +from cic_eth.api.api_task import Api + +logg = logging.getLogger() + + +def test_balance_complex_api( + default_chain_spec, + init_database, + init_w3, + cic_registry, + dummy_token, + dummy_token_registered, + celery_session_worker, + init_eth_tester, + ): + + chain_str = str(default_chain_spec) + + api = Api(chain_str, queue=None, callback_param='foo') + + a = web3.Web3.toChecksumAddress('0x' + os.urandom(20).hex()) + t = api.balance(a, 'DUM') + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + logg.debug(r) + + assert r[0].get('balance_incoming') != None + assert r[0].get('balance_outgoing') != None + assert r[0].get('balance_network') != None + + logg.debug('r {}'.format(r)) diff --git a/apps/cic-eth/tests/tasks/test_balance_complex.py b/apps/cic-eth/tests/tasks/test_balance_complex.py new file mode 100644 index 00000000..bf4b2214 --- /dev/null +++ b/apps/cic-eth/tests/tasks/test_balance_complex.py @@ -0,0 +1,232 @@ +# standard imports +import logging + +# third-party imports +from cic_registry import CICRegistry +import celery + +# local imports +from cic_eth.eth.rpc import RpcClient +from cic_eth.db.models.otx import Otx +from cic_eth.eth.util import unpack_signed_raw_tx + +#logg = logging.getLogger(__name__) +logg = logging.getLogger() + + +def test_balance_complex( + default_chain_spec, + init_database, + init_w3, + cic_registry, + dummy_token_gifted, + celery_session_worker, + init_eth_tester, + ): + + chain_str = str(default_chain_spec) + token_data = { + 'address': dummy_token_gifted, + 'converters': [], + } + + tx_hashes = [] + for i in range(3): + s = celery.signature( + 'cic_eth.eth.token.transfer', + [ + [token_data], + init_w3.eth.accounts[0], + init_w3.eth.accounts[1], + 1000*(i+1), + chain_str, + ], + ) + t = s.apply_async() + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + tx_hashes.append(r) + + otx = Otx.load(r) + + s_send = celery.signature( + 'cic_eth.eth.tx.send', + [ + [otx.signed_tx], + chain_str, + ], + ) + t = s_send.apply_async() + t.get() + for r in t.collect(): + pass + assert t.successful() + init_eth_tester.mine_block() + + + # here insert block sync to get state of balance + + s_balance_base = celery.signature( + 'cic_eth.eth.token.balance', + [ + [token_data], + init_w3.eth.accounts[0], + chain_str, + ], + ) + + s_balance_out = celery.signature( + 'cic_eth.queue.balance.balance_outgoing', + [ + init_w3.eth.accounts[0], + chain_str, + ] + ) + + s_balance_in = celery.signature( + 'cic_eth.queue.balance.balance_incoming', + [ + init_w3.eth.accounts[0], + chain_str, + ] + ) + s_balance_out.link(s_balance_in) + s_balance_base.link(s_balance_out) + t = s_balance_base.apply_async() + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + + assert r[0]['balance_network'] > 0 + assert r[0]['balance_incoming'] == 0 + assert r[0]['balance_outgoing'] > 0 + + s_balance_base = celery.signature( + 'cic_eth.eth.token.balance', + [ + init_w3.eth.accounts[1], + chain_str, + ], + ) + + s_balance_out = celery.signature( + 'cic_eth.queue.balance.balance_outgoing', + [ + [token_data], + init_w3.eth.accounts[1], + chain_str, + ] + ) + + s_balance_in = celery.signature( + 'cic_eth.queue.balance.balance_incoming', + [ + init_w3.eth.accounts[1], + chain_str, + ] + ) + + s_balance_base.link(s_balance_in) + s_balance_out.link(s_balance_base) + t = s_balance_out.apply_async() + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + + assert r[0]['balance_network'] > 0 + assert r[0]['balance_incoming'] > 0 + assert r[0]['balance_outgoing'] == 0 + + # Set confirmed status in backend + for tx_hash in tx_hashes: + rcpt = init_w3.eth.getTransactionReceipt(tx_hash) + assert rcpt['status'] == 1 + otx = Otx.load(tx_hash, session=init_database) + otx.success(block=rcpt['blockNumber'], session=init_database) + init_database.add(otx) + init_database.commit() + + + s_balance_base = celery.signature( + 'cic_eth.eth.token.balance', + [ + init_w3.eth.accounts[1], + chain_str, + ], + ) + + s_balance_out = celery.signature( + 'cic_eth.queue.balance.balance_outgoing', + [ + [token_data], + init_w3.eth.accounts[1], + chain_str, + ] + ) + + s_balance_in = celery.signature( + 'cic_eth.queue.balance.balance_incoming', + [ + init_w3.eth.accounts[1], + chain_str, + ] + ) + + s_balance_base.link(s_balance_in) + s_balance_out.link(s_balance_base) + t = s_balance_out.apply_async() + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + assert r[0]['balance_network'] > 0 + assert r[0]['balance_incoming'] == 0 + assert r[0]['balance_outgoing'] == 0 + + + s_balance_base = celery.signature( + 'cic_eth.eth.token.balance', + [ + init_w3.eth.accounts[0], + chain_str, + ], + ) + + s_balance_out = celery.signature( + 'cic_eth.queue.balance.balance_outgoing', + [ + [token_data], + init_w3.eth.accounts[0], + chain_str, + ] + ) + + s_balance_in = celery.signature( + 'cic_eth.queue.balance.balance_incoming', + [ + init_w3.eth.accounts[0], + chain_str, + ] + ) + + s_balance_base.link(s_balance_in) + s_balance_out.link(s_balance_base) + t = s_balance_out.apply_async() + t.get() + r = None + for c in t.collect(): + r = c[1] + assert t.successful() + assert r[0]['balance_network'] > 0 + assert r[0]['balance_incoming'] == 0 + assert r[0]['balance_outgoing'] == 0 + + diff --git a/apps/cic-eth/tests/unit/queue/test_balances.py b/apps/cic-eth/tests/unit/queue/test_balances.py new file mode 100644 index 00000000..168401c9 --- /dev/null +++ b/apps/cic-eth/tests/unit/queue/test_balances.py @@ -0,0 +1,158 @@ +# standard imports +import os +import logging + +# third-party imports +import pytest + +# local imports +from cic_eth.db.models.otx import Otx +from cic_eth.db.models.tx import TxCache +from cic_eth.queue.balance import ( + balance_outgoing, + balance_incoming, + assemble_balances, + ) + +logg = logging.getLogger() + + +def test_assemble(): + + token_foo = '0x' + os.urandom(20).hex() + token_bar = '0x' + os.urandom(20).hex() + b = [ + [ + { + 'address': token_foo, + 'converters': [], + 'balance_foo': 42, + }, + { + 'address': token_bar, + 'converters': [], + 'balance_baz': 666, + }, + ], + [ + { + 'address': token_foo, + 'converters': [], + 'balance_bar': 13, + }, + + { + 'address': token_bar, + 'converters': [], + 'balance_xyzzy': 1337, + } + ] + ] + r = assemble_balances(b) + logg.debug('r {}'.format(r)) + + assert r[0]['address'] == token_foo + assert r[1]['address'] == token_bar + assert r[0].get('balance_foo') != None + assert r[0].get('balance_bar') != None + assert r[1].get('balance_baz') != None + assert r[1].get('balance_xyzzy') != None + + +@pytest.mark.skip() +def test_outgoing_balance( + default_chain_spec, + init_database, + ): + + chain_str = str(default_chain_spec) + recipient = '0x' + os.urandom(20).hex() + tx_hash = '0x' + os.urandom(32).hex() + signed_tx = '0x' + os.urandom(128).hex() + otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + init_database.add(otx) + init_database.commit() + + token_address = '0x' + os.urandom(20).hex() + sender = '0x' + os.urandom(20).hex() + txc = TxCache( + tx_hash, + sender, + recipient, + token_address, + token_address, + 1000, + 1000, + ) + init_database.add(txc) + init_database.commit() + + token_data = { + 'address': token_address, + 'converters': [], + } + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 1000 + + otx.sent(session=init_database) + init_database.commit() + + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 1000 + + otx.success(block=1024, session=init_database) + init_database.commit() + + b = balance_outgoing([token_data], sender, chain_str) + assert b[0]['balance_outgoing'] == 0 + + +@pytest.mark.skip() +def test_incoming_balance( + default_chain_spec, + init_database, + ): + + chain_str = str(default_chain_spec) + recipient = '0x' + os.urandom(20).hex() + tx_hash = '0x' + os.urandom(32).hex() + signed_tx = '0x' + os.urandom(128).hex() + otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database) + init_database.add(otx) + init_database.commit() + + token_address = '0x' + os.urandom(20).hex() + sender = '0x' + os.urandom(20).hex() + txc = TxCache( + tx_hash, + sender, + recipient, + token_address, + token_address, + 1000, + 1000, + ) + init_database.add(txc) + init_database.commit() + + token_data = { + 'address': token_address, + 'converters': [], + } + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 0 + + otx.sent(session=init_database) + init_database.commit() + + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 1000 + + otx.success(block=1024, session=init_database) + init_database.commit() + + b = balance_incoming([token_data], recipient, chain_str) + assert b[0]['balance_incoming'] == 0 + + +