Merge branch 'lash/complex-balance' into 'master'
cic-eth: Complex balance See merge request grassrootseconomics/cic-internal-integration!29
This commit is contained in:
		
						commit
						6c1382aac6
					
				@ -30,7 +30,7 @@ class Api:
 | 
			
		||||
    :param queue: Name of worker queue to submit tasks to
 | 
			
		||||
    :type queue: str
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop', callback_queue=None):
 | 
			
		||||
    def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
 | 
			
		||||
        self.chain_str = chain_str
 | 
			
		||||
        self.chain_spec = ChainSpec.from_chain_str(chain_str)
 | 
			
		||||
        self.callback_param = callback_param
 | 
			
		||||
@ -301,13 +301,15 @@ class Api:
 | 
			
		||||
        return t
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def balance(self, address, token_symbol):
 | 
			
		||||
    def balance(self, address, token_symbol, include_pending=True):
 | 
			
		||||
        """Calls the provided callback with the current token balance of the given address.
 | 
			
		||||
 | 
			
		||||
        :param address: Ethereum address of holder
 | 
			
		||||
        :type address: str, 0x-hex
 | 
			
		||||
        :param token_symbol: ERC20 token symbol of token to send
 | 
			
		||||
        :type token_symbol: str
 | 
			
		||||
        :param include_pending: If set, will include transactions that have not yet been fully processed
 | 
			
		||||
        :type include_pending: bool
 | 
			
		||||
        :returns: uuid of root task
 | 
			
		||||
        :rtype: celery.Task
 | 
			
		||||
        """
 | 
			
		||||
@ -330,14 +332,45 @@ class Api:
 | 
			
		||||
                    ],
 | 
			
		||||
                queue=self.queue,
 | 
			
		||||
                )
 | 
			
		||||
        
 | 
			
		||||
        if self.callback_param != None:
 | 
			
		||||
            s_balance.link(self.callback_success)
 | 
			
		||||
            s_tokens.link(s_balance).on_error(self.callback_error)
 | 
			
		||||
        else:
 | 
			
		||||
            s_tokens.link(s_balance)
 | 
			
		||||
        s_result = celery.signature(
 | 
			
		||||
                'cic_eth.queue.balance.assemble_balances',
 | 
			
		||||
                [],
 | 
			
		||||
                queue=self.queue,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        t = s_tokens.apply_async(queue=self.queue)
 | 
			
		||||
        last_in_chain = s_balance
 | 
			
		||||
        if include_pending:
 | 
			
		||||
            s_balance_incoming = celery.signature(
 | 
			
		||||
                    'cic_eth.queue.balance.balance_incoming',
 | 
			
		||||
                    [
 | 
			
		||||
                        address,
 | 
			
		||||
                        self.chain_str,
 | 
			
		||||
                        ],
 | 
			
		||||
                    queue=self.queue,
 | 
			
		||||
                    )
 | 
			
		||||
            s_balance_outgoing = celery.signature(
 | 
			
		||||
                    'cic_eth.queue.balance.balance_outgoing',
 | 
			
		||||
                    [
 | 
			
		||||
                        address,
 | 
			
		||||
                        self.chain_str,
 | 
			
		||||
                        ],
 | 
			
		||||
                    queue=self.queue,
 | 
			
		||||
                    )
 | 
			
		||||
            s_balance.link(s_balance_incoming)
 | 
			
		||||
            s_balance_incoming.link(s_balance_outgoing)
 | 
			
		||||
            last_in_chain = s_balance_outgoing
 | 
			
		||||
 | 
			
		||||
        one = celery.chain(s_tokens, s_balance)
 | 
			
		||||
        two = celery.chain(s_tokens, s_balance_incoming)
 | 
			
		||||
        three = celery.chain(s_tokens, s_balance_outgoing)
 | 
			
		||||
 | 
			
		||||
        t = None
 | 
			
		||||
        if self.callback_param != None:
 | 
			
		||||
            s_result.link(self.callback_success).on_error(self.callback_error)
 | 
			
		||||
            t = celery.chord([one, two, three])(s_result)
 | 
			
		||||
        else:
 | 
			
		||||
            t = celery.chord([one, two, three])(s_result)
 | 
			
		||||
    
 | 
			
		||||
        return t
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -18,4 +18,4 @@ def noop(self, result, param, status_code):
 | 
			
		||||
    :rtype: bool
 | 
			
		||||
    """
 | 
			
		||||
    logg.info('noop callback {} {} {}'.format(result, param, status_code))
 | 
			
		||||
    return True
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
@ -4,20 +4,23 @@ import enum
 | 
			
		||||
 | 
			
		||||
@enum.unique
 | 
			
		||||
class StatusBits(enum.IntEnum):
 | 
			
		||||
    QUEUED = 0x01
 | 
			
		||||
    IN_NETWORK = 0x08
 | 
			
		||||
    """Individual bit flags that are combined to define the state and legacy of a queued transaction
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    QUEUED = 0x01 # transaction should be sent to network
 | 
			
		||||
    IN_NETWORK = 0x08 # transaction is in network
 | 
			
		||||
   
 | 
			
		||||
    DEFERRED = 0x10
 | 
			
		||||
    GAS_ISSUES = 0x20
 | 
			
		||||
    DEFERRED = 0x10 # an attempt to send the transaction to network has failed
 | 
			
		||||
    GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
 | 
			
		||||
 | 
			
		||||
    LOCAL_ERROR = 0x100
 | 
			
		||||
    NODE_ERROR = 0x200
 | 
			
		||||
    NETWORK_ERROR = 0x400
 | 
			
		||||
    UNKNOWN_ERROR = 0x800
 | 
			
		||||
    LOCAL_ERROR = 0x100 # errors that originate internally from the component
 | 
			
		||||
    NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
 | 
			
		||||
    NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
 | 
			
		||||
    UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
 | 
			
		||||
 | 
			
		||||
    FINAL = 0x1000
 | 
			
		||||
    OBSOLETE = 0x2000
 | 
			
		||||
    MANUAL = 0x8000
 | 
			
		||||
    FINAL = 0x1000 # transaction processing has completed
 | 
			
		||||
    OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
 | 
			
		||||
    MANUAL = 0x8000 # transaction processing has been manually overridden
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@enum.unique
 | 
			
		||||
@ -79,6 +82,19 @@ class LockEnum(enum.IntEnum):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def status_str(v, bits_only=False):
 | 
			
		||||
    """Render a human-readable string describing the status
 | 
			
		||||
 | 
			
		||||
    If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
 | 
			
		||||
 | 
			
		||||
    If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
 | 
			
		||||
 | 
			
		||||
    :param v: Status bit field
 | 
			
		||||
    :type v: number
 | 
			
		||||
    :param bits_only: Only render individual bit labels.
 | 
			
		||||
    :type bits_only: bool
 | 
			
		||||
    :returns: Status string
 | 
			
		||||
    :rtype: str
 | 
			
		||||
    """
 | 
			
		||||
    s = ''
 | 
			
		||||
    if not bits_only:
 | 
			
		||||
        try:
 | 
			
		||||
@ -100,14 +116,39 @@ def status_str(v, bits_only=False):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def all_errors():
 | 
			
		||||
    """Bit mask of all error states
 | 
			
		||||
 | 
			
		||||
    :returns: Error flags
 | 
			
		||||
    :rtype: number
 | 
			
		||||
    """
 | 
			
		||||
    return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_error_status(v):
 | 
			
		||||
    """Check if value is an error state
 | 
			
		||||
 | 
			
		||||
    :param v: Status bit field
 | 
			
		||||
    :type v: number
 | 
			
		||||
    :returns: True if error
 | 
			
		||||
    :rtype: bool
 | 
			
		||||
    """
 | 
			
		||||
    return bool(v & all_errors())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def dead():
 | 
			
		||||
    """Bit mask defining whether a transaction is still likely to be processed on the network.
 | 
			
		||||
 | 
			
		||||
    :returns: Bit mask
 | 
			
		||||
    :rtype: number
 | 
			
		||||
    """
 | 
			
		||||
    return StatusBits.FINAL | StatusBits.OBSOLETE
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_alive(v):
 | 
			
		||||
    return bool(v & (StatusBits.FINAL | StatusBits.OBSOLETE) == 0)
 | 
			
		||||
    """Check if transaction is still likely to be processed on the network.
 | 
			
		||||
 | 
			
		||||
    The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
 | 
			
		||||
 | 
			
		||||
    :returns: 
 | 
			
		||||
    """
 | 
			
		||||
    return bool(v & dead() == 0)
 | 
			
		||||
 | 
			
		||||
@ -287,7 +287,6 @@ class Otx(SessionBase):
 | 
			
		||||
 | 
			
		||||
        self.__set_status(StatusBits.IN_NETWORK, session)
 | 
			
		||||
        self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
 | 
			
		||||
        logg.debug('<<< status {}'.format(status_str(self.status)))
 | 
			
		||||
 | 
			
		||||
        if self.tracing:
 | 
			
		||||
            self.__state_log(session=session)
 | 
			
		||||
 | 
			
		||||
@ -187,7 +187,6 @@ def balance(tokens, holder_address, chain_str):
 | 
			
		||||
    """
 | 
			
		||||
    #abi = ContractRegistry.abi('ERC20Token')
 | 
			
		||||
    chain_spec = ChainSpec.from_chain_str(chain_str)
 | 
			
		||||
    balances = []
 | 
			
		||||
    c = RpcClient(chain_spec)
 | 
			
		||||
    for t in tokens:
 | 
			
		||||
        #token = CICRegistry.get_address(t['address'])
 | 
			
		||||
@ -195,9 +194,9 @@ def balance(tokens, holder_address, chain_str):
 | 
			
		||||
        #o = c.w3.eth.contract(abi=abi, address=t['address'])
 | 
			
		||||
        o = CICRegistry.get_address(chain_spec, t['address']).contract
 | 
			
		||||
        b = o.functions.balanceOf(holder_address).call()
 | 
			
		||||
        logg.debug('balance {} for {}: {}'.format(t['address'], holder_address, b))
 | 
			
		||||
        balances.append(b)
 | 
			
		||||
    return b
 | 
			
		||||
        t['balance_network'] = b
 | 
			
		||||
 | 
			
		||||
    return tokens
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@celery_app.task(bind=True)
 | 
			
		||||
@ -326,7 +325,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
 | 
			
		||||
        token = CICRegistry.get_token(chain_spec, token_symbol)
 | 
			
		||||
        tokens.append({
 | 
			
		||||
            'address': token.address(),
 | 
			
		||||
            #'converters': [],
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            })
 | 
			
		||||
    return tokens
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -344,7 +344,6 @@ def send(self, txs, chain_str):
 | 
			
		||||
 | 
			
		||||
    chain_spec = ChainSpec.from_chain_str(chain_str)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    tx_hex = txs[0]
 | 
			
		||||
    logg.debug('send transaction {}'.format(tx_hex))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										120
									
								
								apps/cic-eth/cic_eth/queue/balance.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										120
									
								
								apps/cic-eth/cic_eth/queue/balance.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,120 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
# third-party imports
 | 
			
		||||
import celery
 | 
			
		||||
from hexathon import strip_0x
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from cic_registry.chain import ChainSpec
 | 
			
		||||
from cic_eth.db import SessionBase
 | 
			
		||||
from cic_eth.db.models.otx import Otx
 | 
			
		||||
from cic_eth.db.models.tx import TxCache
 | 
			
		||||
from cic_eth.db.enum import (
 | 
			
		||||
        StatusBits,
 | 
			
		||||
        dead,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
celery_app = celery.current_app
 | 
			
		||||
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def __balance_outgoing_compatible(token_address, holder_address, chain_str):
 | 
			
		||||
    session = SessionBase.create_session()
 | 
			
		||||
    q = session.query(TxCache.from_value)
 | 
			
		||||
    q = q.join(Otx)
 | 
			
		||||
    q = q.filter(TxCache.sender==holder_address)
 | 
			
		||||
    status_compare = dead()
 | 
			
		||||
    q = q.filter(Otx.status.op('&')(status_compare)==0)
 | 
			
		||||
    q = q.filter(TxCache.source_token_address==token_address)
 | 
			
		||||
    delta = 0
 | 
			
		||||
    for r in q.all():
 | 
			
		||||
        delta += int(r[0])
 | 
			
		||||
    session.close()
 | 
			
		||||
    return delta
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@celery_app.task()
 | 
			
		||||
def balance_outgoing(tokens, holder_address, chain_str):
 | 
			
		||||
    """Retrieve accumulated value of unprocessed transactions sent from the given address.
 | 
			
		||||
 | 
			
		||||
    :param tokens: list of token spec dicts with addresses to retrieve balances for
 | 
			
		||||
    :type tokens: list of str, 0x-hex
 | 
			
		||||
    :param holder_address: Sender address
 | 
			
		||||
    :type holder_address: str, 0x-hex
 | 
			
		||||
    :param chain_str: Chain spec string representation
 | 
			
		||||
    :type chain_str: str
 | 
			
		||||
    :returns: Tokens dicts with outgoing balance added
 | 
			
		||||
    :rtype: dict
 | 
			
		||||
    """
 | 
			
		||||
    chain_spec = ChainSpec.from_chain_str(chain_str)
 | 
			
		||||
    for t in tokens: 
 | 
			
		||||
        b = __balance_outgoing_compatible(t['address'], holder_address, chain_str)
 | 
			
		||||
        t['balance_outgoing'] = b
 | 
			
		||||
 | 
			
		||||
    return tokens
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def __balance_incoming_compatible(token_address, receiver_address, chain_str):
 | 
			
		||||
    session = SessionBase.create_session()
 | 
			
		||||
    q = session.query(TxCache.to_value)
 | 
			
		||||
    q = q.join(Otx)
 | 
			
		||||
    q = q.filter(TxCache.recipient==receiver_address)
 | 
			
		||||
    status_compare = dead()
 | 
			
		||||
    q = q.filter(Otx.status.op('&')(status_compare)==0)
 | 
			
		||||
    # TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed. 
 | 
			
		||||
    q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK)
 | 
			
		||||
    q = q.filter(TxCache.destination_token_address==token_address)
 | 
			
		||||
    delta = 0
 | 
			
		||||
    for r in q.all():
 | 
			
		||||
        delta += int(r[0])
 | 
			
		||||
    session.close()
 | 
			
		||||
    return delta
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@celery_app.task()
 | 
			
		||||
def balance_incoming(tokens, receipient_address, chain_str):
 | 
			
		||||
    """Retrieve accumulated value of unprocessed transactions to be received by the given address.
 | 
			
		||||
 | 
			
		||||
    :param tokens: list of token spec dicts with addresses to retrieve balances for
 | 
			
		||||
    :type tokens: list of str, 0x-hex
 | 
			
		||||
    :param holder_address: Recipient address
 | 
			
		||||
    :type holder_address: str, 0x-hex
 | 
			
		||||
    :param chain_str: Chain spec string representation
 | 
			
		||||
    :type chain_str: str
 | 
			
		||||
    :returns: Tokens dicts with outgoing balance added
 | 
			
		||||
    :rtype: dict
 | 
			
		||||
    """
 | 
			
		||||
    chain_spec = ChainSpec.from_chain_str(chain_str)
 | 
			
		||||
    for t in tokens: 
 | 
			
		||||
        b = __balance_incoming_compatible(t['address'], receipient_address, chain_str)
 | 
			
		||||
        t['balance_incoming'] = b
 | 
			
		||||
 | 
			
		||||
    return tokens
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@celery_app.task()
 | 
			
		||||
def assemble_balances(balances_collection):
 | 
			
		||||
    """Combines token spec dicts with individual balances into a single token spec dict.
 | 
			
		||||
 | 
			
		||||
    A "balance" means any field that is keyed with a string starting with "balance_"
 | 
			
		||||
 | 
			
		||||
    :param balances_collection: Token spec dicts
 | 
			
		||||
    :type balances_collection: list of lists of dicts
 | 
			
		||||
    :returns: Single token spec dict per token with all balances
 | 
			
		||||
    :rtype: list of dicts
 | 
			
		||||
    """
 | 
			
		||||
    tokens = {}
 | 
			
		||||
    for c in balances_collection:
 | 
			
		||||
        for b in c:
 | 
			
		||||
            address = b['address']
 | 
			
		||||
            if tokens.get(address) == None:
 | 
			
		||||
                tokens[address] = {
 | 
			
		||||
                    'address': address,
 | 
			
		||||
                    'converters': b['converters'],
 | 
			
		||||
                        }
 | 
			
		||||
            for k in b.keys():
 | 
			
		||||
                if k[:8] == 'balance_':
 | 
			
		||||
                    tokens[address][k] = b[k]
 | 
			
		||||
    return list(tokens.values())
 | 
			
		||||
@ -5,6 +5,7 @@ import datetime
 | 
			
		||||
 | 
			
		||||
# third-party imports
 | 
			
		||||
import celery
 | 
			
		||||
from hexathon import strip_0x
 | 
			
		||||
from sqlalchemy import or_
 | 
			
		||||
from sqlalchemy import not_
 | 
			
		||||
from sqlalchemy import tuple_
 | 
			
		||||
@ -12,6 +13,7 @@ from sqlalchemy import func
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from cic_registry import CICRegistry
 | 
			
		||||
from cic_registry.chain import ChainSpec
 | 
			
		||||
from cic_eth.db.models.otx import Otx
 | 
			
		||||
from cic_eth.db.models.otx import OtxStateLog
 | 
			
		||||
from cic_eth.db.models.tx import TxCache
 | 
			
		||||
@ -22,6 +24,7 @@ from cic_eth.db.enum import (
 | 
			
		||||
        LockEnum,
 | 
			
		||||
        StatusBits,
 | 
			
		||||
        is_alive,
 | 
			
		||||
        dead,
 | 
			
		||||
        )
 | 
			
		||||
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
 | 
			
		||||
from cic_eth.error import NotLocalTxError
 | 
			
		||||
@ -687,5 +690,3 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
 | 
			
		||||
 | 
			
		||||
    return txs
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,7 @@ def celery_includes():
 | 
			
		||||
        'cic_eth.eth.tx',
 | 
			
		||||
        'cic_eth.ext.tx',
 | 
			
		||||
        'cic_eth.queue.tx',
 | 
			
		||||
        'cic_eth.queue.balance',
 | 
			
		||||
        'cic_eth.admin.ctrl',
 | 
			
		||||
        'cic_eth.admin.nonce',
 | 
			
		||||
        'cic_eth.eth.account',
 | 
			
		||||
 | 
			
		||||
@ -29,27 +29,6 @@ def test_account_api(
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_balance_api(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        default_chain_registry,
 | 
			
		||||
        init_w3,
 | 
			
		||||
        cic_registry,
 | 
			
		||||
        init_database,
 | 
			
		||||
        bancor_tokens,
 | 
			
		||||
        bancor_registry,
 | 
			
		||||
        celery_session_worker,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
    token = CICRegistry.get_address(default_chain_spec, bancor_tokens[0])
 | 
			
		||||
 | 
			
		||||
    api = Api(str(default_chain_spec), callback_param='balance', callback_task='cic_eth.callbacks.noop.noop', queue=None)
 | 
			
		||||
    t = api.balance(init_w3.eth.accounts[2], token.symbol())
 | 
			
		||||
    t.get()
 | 
			
		||||
    for r in t.collect():
 | 
			
		||||
        print(r)
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_transfer_api(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        init_w3,
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										40
									
								
								apps/cic-eth/tests/functional/test_balance.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								apps/cic-eth/tests/functional/test_balance.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,40 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import os
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
import web3
 | 
			
		||||
from cic_eth.api.api_task import Api
 | 
			
		||||
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_balance_complex_api(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        init_database,
 | 
			
		||||
        init_w3,
 | 
			
		||||
        cic_registry,
 | 
			
		||||
        dummy_token,
 | 
			
		||||
        dummy_token_registered,
 | 
			
		||||
        celery_session_worker,
 | 
			
		||||
        init_eth_tester,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
    chain_str = str(default_chain_spec)
 | 
			
		||||
 | 
			
		||||
    api = Api(chain_str, queue=None, callback_param='foo')
 | 
			
		||||
 | 
			
		||||
    a = web3.Web3.toChecksumAddress('0x' + os.urandom(20).hex())
 | 
			
		||||
    t = api.balance(a, 'DUM')
 | 
			
		||||
    t.get()
 | 
			
		||||
    r = None
 | 
			
		||||
    for c in t.collect():
 | 
			
		||||
        r = c[1]
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
    logg.debug(r)
 | 
			
		||||
 | 
			
		||||
    assert r[0].get('balance_incoming') != None
 | 
			
		||||
    assert r[0].get('balance_outgoing') != None
 | 
			
		||||
    assert r[0].get('balance_network') != None
 | 
			
		||||
 | 
			
		||||
    logg.debug('r {}'.format(r))
 | 
			
		||||
							
								
								
									
										232
									
								
								apps/cic-eth/tests/tasks/test_balance_complex.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										232
									
								
								apps/cic-eth/tests/tasks/test_balance_complex.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,232 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
# third-party imports
 | 
			
		||||
from cic_registry import CICRegistry
 | 
			
		||||
import celery
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from cic_eth.eth.rpc import RpcClient
 | 
			
		||||
from cic_eth.db.models.otx import Otx
 | 
			
		||||
from cic_eth.eth.util import unpack_signed_raw_tx
 | 
			
		||||
 | 
			
		||||
#logg = logging.getLogger(__name__)
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_balance_complex(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        init_database,
 | 
			
		||||
        init_w3,
 | 
			
		||||
        cic_registry,
 | 
			
		||||
        dummy_token_gifted,
 | 
			
		||||
        celery_session_worker,
 | 
			
		||||
        init_eth_tester,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
    chain_str = str(default_chain_spec)
 | 
			
		||||
    token_data = {
 | 
			
		||||
            'address': dummy_token_gifted,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
    tx_hashes = []
 | 
			
		||||
    for i in range(3):
 | 
			
		||||
        s = celery.signature(
 | 
			
		||||
                'cic_eth.eth.token.transfer',
 | 
			
		||||
                [
 | 
			
		||||
                    [token_data],
 | 
			
		||||
                    init_w3.eth.accounts[0],
 | 
			
		||||
                    init_w3.eth.accounts[1],
 | 
			
		||||
                    1000*(i+1),
 | 
			
		||||
                    chain_str,
 | 
			
		||||
                    ],
 | 
			
		||||
        )
 | 
			
		||||
        t = s.apply_async()
 | 
			
		||||
        t.get()
 | 
			
		||||
        r = None
 | 
			
		||||
        for c in t.collect():
 | 
			
		||||
            r = c[1]
 | 
			
		||||
        assert t.successful()
 | 
			
		||||
        tx_hashes.append(r)
 | 
			
		||||
 | 
			
		||||
        otx = Otx.load(r)
 | 
			
		||||
 | 
			
		||||
        s_send = celery.signature(
 | 
			
		||||
                'cic_eth.eth.tx.send',
 | 
			
		||||
                [
 | 
			
		||||
                    [otx.signed_tx],
 | 
			
		||||
                    chain_str,
 | 
			
		||||
                    ],
 | 
			
		||||
                )
 | 
			
		||||
        t = s_send.apply_async()
 | 
			
		||||
        t.get()
 | 
			
		||||
        for r in t.collect():
 | 
			
		||||
            pass
 | 
			
		||||
        assert t.successful()
 | 
			
		||||
        init_eth_tester.mine_block() 
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # here insert block sync to get state of balance
 | 
			
		||||
 | 
			
		||||
    s_balance_base = celery.signature(
 | 
			
		||||
            'cic_eth.eth.token.balance',
 | 
			
		||||
            [
 | 
			
		||||
                [token_data],
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    s_balance_out = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_outgoing',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_in = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_incoming',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
    s_balance_out.link(s_balance_in)
 | 
			
		||||
    s_balance_base.link(s_balance_out)
 | 
			
		||||
    t = s_balance_base.apply_async()
 | 
			
		||||
    t.get()
 | 
			
		||||
    r = None
 | 
			
		||||
    for c in t.collect():
 | 
			
		||||
        r = c[1]
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
 | 
			
		||||
    assert r[0]['balance_network'] > 0
 | 
			
		||||
    assert r[0]['balance_incoming'] == 0
 | 
			
		||||
    assert r[0]['balance_outgoing'] > 0
 | 
			
		||||
 | 
			
		||||
    s_balance_base = celery.signature(
 | 
			
		||||
            'cic_eth.eth.token.balance',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    s_balance_out = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_outgoing',
 | 
			
		||||
            [
 | 
			
		||||
                [token_data],
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_in = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_incoming',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_base.link(s_balance_in)
 | 
			
		||||
    s_balance_out.link(s_balance_base)
 | 
			
		||||
    t = s_balance_out.apply_async()
 | 
			
		||||
    t.get()
 | 
			
		||||
    r = None
 | 
			
		||||
    for c in t.collect():
 | 
			
		||||
        r = c[1]
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
 | 
			
		||||
    assert r[0]['balance_network'] > 0
 | 
			
		||||
    assert r[0]['balance_incoming'] > 0
 | 
			
		||||
    assert r[0]['balance_outgoing'] == 0
 | 
			
		||||
 | 
			
		||||
    # Set confirmed status in backend
 | 
			
		||||
    for tx_hash in tx_hashes:
 | 
			
		||||
        rcpt = init_w3.eth.getTransactionReceipt(tx_hash)
 | 
			
		||||
        assert rcpt['status'] == 1
 | 
			
		||||
        otx = Otx.load(tx_hash, session=init_database)
 | 
			
		||||
        otx.success(block=rcpt['blockNumber'], session=init_database)
 | 
			
		||||
        init_database.add(otx)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    s_balance_base = celery.signature(
 | 
			
		||||
            'cic_eth.eth.token.balance',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    s_balance_out = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_outgoing',
 | 
			
		||||
            [
 | 
			
		||||
                [token_data],
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_in = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_incoming',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[1],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_base.link(s_balance_in)
 | 
			
		||||
    s_balance_out.link(s_balance_base)
 | 
			
		||||
    t = s_balance_out.apply_async()
 | 
			
		||||
    t.get()
 | 
			
		||||
    r = None
 | 
			
		||||
    for c in t.collect():
 | 
			
		||||
        r = c[1]
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
    assert r[0]['balance_network'] > 0
 | 
			
		||||
    assert r[0]['balance_incoming'] == 0
 | 
			
		||||
    assert r[0]['balance_outgoing'] == 0
 | 
			
		||||
 | 
			
		||||
    
 | 
			
		||||
    s_balance_base = celery.signature(
 | 
			
		||||
            'cic_eth.eth.token.balance',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    s_balance_out = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_outgoing',
 | 
			
		||||
            [
 | 
			
		||||
                [token_data],
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_in = celery.signature(
 | 
			
		||||
            'cic_eth.queue.balance.balance_incoming',
 | 
			
		||||
            [
 | 
			
		||||
                init_w3.eth.accounts[0],
 | 
			
		||||
                chain_str,
 | 
			
		||||
                ]
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    s_balance_base.link(s_balance_in)
 | 
			
		||||
    s_balance_out.link(s_balance_base)
 | 
			
		||||
    t = s_balance_out.apply_async()
 | 
			
		||||
    t.get()
 | 
			
		||||
    r = None
 | 
			
		||||
    for c in t.collect():
 | 
			
		||||
        r = c[1]
 | 
			
		||||
    assert t.successful()
 | 
			
		||||
    assert r[0]['balance_network'] > 0
 | 
			
		||||
    assert r[0]['balance_incoming'] == 0
 | 
			
		||||
    assert r[0]['balance_outgoing'] == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										158
									
								
								apps/cic-eth/tests/unit/queue/test_balances.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										158
									
								
								apps/cic-eth/tests/unit/queue/test_balances.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,158 @@
 | 
			
		||||
# standard imports
 | 
			
		||||
import os
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
# third-party imports
 | 
			
		||||
import pytest
 | 
			
		||||
 | 
			
		||||
# local imports
 | 
			
		||||
from cic_eth.db.models.otx import Otx
 | 
			
		||||
from cic_eth.db.models.tx import TxCache
 | 
			
		||||
from cic_eth.queue.balance import (
 | 
			
		||||
        balance_outgoing,
 | 
			
		||||
        balance_incoming,
 | 
			
		||||
        assemble_balances,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
logg = logging.getLogger()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_assemble():
 | 
			
		||||
   
 | 
			
		||||
    token_foo = '0x' + os.urandom(20).hex()
 | 
			
		||||
    token_bar = '0x' + os.urandom(20).hex()
 | 
			
		||||
    b = [
 | 
			
		||||
            [
 | 
			
		||||
        {
 | 
			
		||||
            'address': token_foo,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            'balance_foo': 42,
 | 
			
		||||
            },
 | 
			
		||||
                {
 | 
			
		||||
            'address': token_bar,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            'balance_baz': 666,
 | 
			
		||||
            },
 | 
			
		||||
                ],
 | 
			
		||||
            [
 | 
			
		||||
        {
 | 
			
		||||
            'address': token_foo,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            'balance_bar': 13,
 | 
			
		||||
            },
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            'address': token_bar,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            'balance_xyzzy': 1337,
 | 
			
		||||
            }
 | 
			
		||||
            ]
 | 
			
		||||
            ]
 | 
			
		||||
    r = assemble_balances(b)
 | 
			
		||||
    logg.debug('r {}'.format(r))
 | 
			
		||||
 | 
			
		||||
    assert r[0]['address'] == token_foo
 | 
			
		||||
    assert r[1]['address'] == token_bar
 | 
			
		||||
    assert r[0].get('balance_foo') != None
 | 
			
		||||
    assert r[0].get('balance_bar') != None
 | 
			
		||||
    assert r[1].get('balance_baz') != None
 | 
			
		||||
    assert r[1].get('balance_xyzzy') != None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skip()
 | 
			
		||||
def test_outgoing_balance(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        init_database,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
    chain_str = str(default_chain_spec)
 | 
			
		||||
    recipient = '0x' + os.urandom(20).hex()
 | 
			
		||||
    tx_hash = '0x' + os.urandom(32).hex()
 | 
			
		||||
    signed_tx = '0x' + os.urandom(128).hex()
 | 
			
		||||
    otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database)
 | 
			
		||||
    init_database.add(otx)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
    
 | 
			
		||||
    token_address = '0x' + os.urandom(20).hex()
 | 
			
		||||
    sender = '0x' + os.urandom(20).hex()
 | 
			
		||||
    txc = TxCache(
 | 
			
		||||
            tx_hash,
 | 
			
		||||
            sender,
 | 
			
		||||
            recipient,
 | 
			
		||||
            token_address,
 | 
			
		||||
            token_address,
 | 
			
		||||
            1000,
 | 
			
		||||
            1000,
 | 
			
		||||
            )
 | 
			
		||||
    init_database.add(txc)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    token_data = {
 | 
			
		||||
            'address': token_address,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            }
 | 
			
		||||
    b = balance_outgoing([token_data], sender, chain_str)
 | 
			
		||||
    assert b[0]['balance_outgoing'] == 1000
 | 
			
		||||
 | 
			
		||||
    otx.sent(session=init_database)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    b = balance_outgoing([token_data], sender, chain_str)
 | 
			
		||||
    assert b[0]['balance_outgoing'] == 1000
 | 
			
		||||
    
 | 
			
		||||
    otx.success(block=1024, session=init_database)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    b = balance_outgoing([token_data], sender, chain_str)
 | 
			
		||||
    assert b[0]['balance_outgoing'] == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.skip()
 | 
			
		||||
def test_incoming_balance(
 | 
			
		||||
        default_chain_spec,
 | 
			
		||||
        init_database,
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
    chain_str = str(default_chain_spec)
 | 
			
		||||
    recipient = '0x' + os.urandom(20).hex()
 | 
			
		||||
    tx_hash = '0x' + os.urandom(32).hex()
 | 
			
		||||
    signed_tx = '0x' + os.urandom(128).hex()
 | 
			
		||||
    otx = Otx.add(0, recipient, tx_hash, signed_tx, session=init_database)
 | 
			
		||||
    init_database.add(otx)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
    
 | 
			
		||||
    token_address = '0x' + os.urandom(20).hex()
 | 
			
		||||
    sender = '0x' + os.urandom(20).hex()
 | 
			
		||||
    txc = TxCache(
 | 
			
		||||
            tx_hash,
 | 
			
		||||
            sender,
 | 
			
		||||
            recipient,
 | 
			
		||||
            token_address,
 | 
			
		||||
            token_address,
 | 
			
		||||
            1000,
 | 
			
		||||
            1000,
 | 
			
		||||
            )
 | 
			
		||||
    init_database.add(txc)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    token_data = {
 | 
			
		||||
            'address': token_address,
 | 
			
		||||
            'converters': [],
 | 
			
		||||
            }
 | 
			
		||||
    b = balance_incoming([token_data], recipient, chain_str)
 | 
			
		||||
    assert b[0]['balance_incoming'] == 0
 | 
			
		||||
 | 
			
		||||
    otx.sent(session=init_database)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    b = balance_incoming([token_data], recipient, chain_str)
 | 
			
		||||
    assert b[0]['balance_incoming'] == 1000
 | 
			
		||||
   
 | 
			
		||||
    otx.success(block=1024, session=init_database)
 | 
			
		||||
    init_database.commit()
 | 
			
		||||
 | 
			
		||||
    b = balance_incoming([token_data], recipient, chain_str)
 | 
			
		||||
    assert b[0]['balance_incoming'] == 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user