cic-eth: Complex balance

This commit is contained in:
Louis Holbrook
2021-02-17 10:04:21 +00:00
parent c6bcda8832
commit ab7b5fbeb9
13 changed files with 654 additions and 52 deletions

View File

@@ -30,7 +30,7 @@ class Api:
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop', callback_queue=None):
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
@@ -301,13 +301,15 @@ class Api:
return t
def balance(self, address, token_symbol):
def balance(self, address, token_symbol, include_pending=True):
"""Calls the provided callback with the current token balance of the given address.
:param address: Ethereum address of holder
:type address: str, 0x-hex
:param token_symbol: ERC20 token symbol of token to send
:type token_symbol: str
:param include_pending: If set, will include transactions that have not yet been fully processed
:type include_pending: bool
:returns: uuid of root task
:rtype: celery.Task
"""
@@ -330,14 +332,45 @@ class Api:
],
queue=self.queue,
)
if self.callback_param != None:
s_balance.link(self.callback_success)
s_tokens.link(s_balance).on_error(self.callback_error)
else:
s_tokens.link(s_balance)
s_result = celery.signature(
'cic_eth.queue.balance.assemble_balances',
[],
queue=self.queue,
)
t = s_tokens.apply_async(queue=self.queue)
last_in_chain = s_balance
if include_pending:
s_balance_incoming = celery.signature(
'cic_eth.queue.balance.balance_incoming',
[
address,
self.chain_str,
],
queue=self.queue,
)
s_balance_outgoing = celery.signature(
'cic_eth.queue.balance.balance_outgoing',
[
address,
self.chain_str,
],
queue=self.queue,
)
s_balance.link(s_balance_incoming)
s_balance_incoming.link(s_balance_outgoing)
last_in_chain = s_balance_outgoing
one = celery.chain(s_tokens, s_balance)
two = celery.chain(s_tokens, s_balance_incoming)
three = celery.chain(s_tokens, s_balance_outgoing)
t = None
if self.callback_param != None:
s_result.link(self.callback_success).on_error(self.callback_error)
t = celery.chord([one, two, three])(s_result)
else:
t = celery.chord([one, two, three])(s_result)
return t

View File

@@ -18,4 +18,4 @@ def noop(self, result, param, status_code):
:rtype: bool
"""
logg.info('noop callback {} {} {}'.format(result, param, status_code))
return True
return result

View File

@@ -4,20 +4,23 @@ import enum
@enum.unique
class StatusBits(enum.IntEnum):
QUEUED = 0x01
IN_NETWORK = 0x08
"""Individual bit flags that are combined to define the state and legacy of a queued transaction
"""
QUEUED = 0x01 # transaction should be sent to network
IN_NETWORK = 0x08 # transaction is in network
DEFERRED = 0x10
GAS_ISSUES = 0x20
DEFERRED = 0x10 # an attempt to send the transaction to network has failed
GAS_ISSUES = 0x20 # transaction is pending sender account gas funding
LOCAL_ERROR = 0x100
NODE_ERROR = 0x200
NETWORK_ERROR = 0x400
UNKNOWN_ERROR = 0x800
LOCAL_ERROR = 0x100 # errors that originate internally from the component
NODE_ERROR = 0x200 # errors originating in the node (invalid RLP input...)
NETWORK_ERROR = 0x400 # errors that originate from the network (REVERT)
UNKNOWN_ERROR = 0x800 # unclassified errors (the should not occur)
FINAL = 0x1000
OBSOLETE = 0x2000
MANUAL = 0x8000
FINAL = 0x1000 # transaction processing has completed
OBSOLETE = 0x2000 # transaction has been replaced by a different transaction with higher fee
MANUAL = 0x8000 # transaction processing has been manually overridden
@enum.unique
@@ -79,6 +82,19 @@ class LockEnum(enum.IntEnum):
def status_str(v, bits_only=False):
"""Render a human-readable string describing the status
If the bit field exactly matches a StatusEnum value, the StatusEnum label will be returned.
If a StatusEnum cannot be matched, the string will be postfixed with "*", unless explicitly instructed to return bit field labels only.
:param v: Status bit field
:type v: number
:param bits_only: Only render individual bit labels.
:type bits_only: bool
:returns: Status string
:rtype: str
"""
s = ''
if not bits_only:
try:
@@ -100,14 +116,39 @@ def status_str(v, bits_only=False):
def all_errors():
"""Bit mask of all error states
:returns: Error flags
:rtype: number
"""
return StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR | StatusBits.NETWORK_ERROR | StatusBits.UNKNOWN_ERROR
def is_error_status(v):
"""Check if value is an error state
:param v: Status bit field
:type v: number
:returns: True if error
:rtype: bool
"""
return bool(v & all_errors())
def dead():
"""Bit mask defining whether a transaction is still likely to be processed on the network.
:returns: Bit mask
:rtype: number
"""
return StatusBits.FINAL | StatusBits.OBSOLETE
def is_alive(v):
return bool(v & (StatusBits.FINAL | StatusBits.OBSOLETE) == 0)
"""Check if transaction is still likely to be processed on the network.
The contingency of "likely" refers to the case a transaction has been obsoleted after sent to the network, but the network still confirms the obsoleted transaction. The return value of this method will not change as a result of this, BUT the state itself will (as the FINAL bit will be set).
:returns:
"""
return bool(v & dead() == 0)

View File

@@ -287,7 +287,6 @@ class Otx(SessionBase):
self.__set_status(StatusBits.IN_NETWORK, session)
self.__reset_status(StatusBits.DEFERRED | StatusBits.QUEUED | StatusBits.LOCAL_ERROR | StatusBits.NODE_ERROR, session)
logg.debug('<<< status {}'.format(status_str(self.status)))
if self.tracing:
self.__state_log(session=session)

View File

@@ -187,7 +187,6 @@ def balance(tokens, holder_address, chain_str):
"""
#abi = ContractRegistry.abi('ERC20Token')
chain_spec = ChainSpec.from_chain_str(chain_str)
balances = []
c = RpcClient(chain_spec)
for t in tokens:
#token = CICRegistry.get_address(t['address'])
@@ -195,9 +194,9 @@ def balance(tokens, holder_address, chain_str):
#o = c.w3.eth.contract(abi=abi, address=t['address'])
o = CICRegistry.get_address(chain_spec, t['address']).contract
b = o.functions.balanceOf(holder_address).call()
logg.debug('balance {} for {}: {}'.format(t['address'], holder_address, b))
balances.append(b)
return b
t['balance_network'] = b
return tokens
@celery_app.task(bind=True)
@@ -326,7 +325,7 @@ def resolve_tokens_by_symbol(token_symbols, chain_str):
token = CICRegistry.get_token(chain_spec, token_symbol)
tokens.append({
'address': token.address(),
#'converters': [],
'converters': [],
})
return tokens

View File

@@ -344,7 +344,6 @@ def send(self, txs, chain_str):
chain_spec = ChainSpec.from_chain_str(chain_str)
tx_hex = txs[0]
logg.debug('send transaction {}'.format(tx_hex))

View File

@@ -0,0 +1,120 @@
# standard imports
import logging
# third-party imports
import celery
from hexathon import strip_0x
# local imports
from cic_registry.chain import ChainSpec
from cic_eth.db import SessionBase
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.tx import TxCache
from cic_eth.db.enum import (
StatusBits,
dead,
)
celery_app = celery.current_app
logg = logging.getLogger()
def __balance_outgoing_compatible(token_address, holder_address, chain_str):
session = SessionBase.create_session()
q = session.query(TxCache.from_value)
q = q.join(Otx)
q = q.filter(TxCache.sender==holder_address)
status_compare = dead()
q = q.filter(Otx.status.op('&')(status_compare)==0)
q = q.filter(TxCache.source_token_address==token_address)
delta = 0
for r in q.all():
delta += int(r[0])
session.close()
return delta
@celery_app.task()
def balance_outgoing(tokens, holder_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions sent from the given address.
:param tokens: list of token spec dicts with addresses to retrieve balances for
:type tokens: list of str, 0x-hex
:param holder_address: Sender address
:type holder_address: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Tokens dicts with outgoing balance added
:rtype: dict
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
for t in tokens:
b = __balance_outgoing_compatible(t['address'], holder_address, chain_str)
t['balance_outgoing'] = b
return tokens
def __balance_incoming_compatible(token_address, receiver_address, chain_str):
session = SessionBase.create_session()
q = session.query(TxCache.to_value)
q = q.join(Otx)
q = q.filter(TxCache.recipient==receiver_address)
status_compare = dead()
q = q.filter(Otx.status.op('&')(status_compare)==0)
# TODO: this can change the result for the recipient if tx is later obsoleted and resubmission is delayed.
q = q.filter(Otx.status.op('&')(StatusBits.IN_NETWORK)==StatusBits.IN_NETWORK)
q = q.filter(TxCache.destination_token_address==token_address)
delta = 0
for r in q.all():
delta += int(r[0])
session.close()
return delta
@celery_app.task()
def balance_incoming(tokens, receipient_address, chain_str):
"""Retrieve accumulated value of unprocessed transactions to be received by the given address.
:param tokens: list of token spec dicts with addresses to retrieve balances for
:type tokens: list of str, 0x-hex
:param holder_address: Recipient address
:type holder_address: str, 0x-hex
:param chain_str: Chain spec string representation
:type chain_str: str
:returns: Tokens dicts with outgoing balance added
:rtype: dict
"""
chain_spec = ChainSpec.from_chain_str(chain_str)
for t in tokens:
b = __balance_incoming_compatible(t['address'], receipient_address, chain_str)
t['balance_incoming'] = b
return tokens
@celery_app.task()
def assemble_balances(balances_collection):
"""Combines token spec dicts with individual balances into a single token spec dict.
A "balance" means any field that is keyed with a string starting with "balance_"
:param balances_collection: Token spec dicts
:type balances_collection: list of lists of dicts
:returns: Single token spec dict per token with all balances
:rtype: list of dicts
"""
tokens = {}
for c in balances_collection:
for b in c:
address = b['address']
if tokens.get(address) == None:
tokens[address] = {
'address': address,
'converters': b['converters'],
}
for k in b.keys():
if k[:8] == 'balance_':
tokens[address][k] = b[k]
return list(tokens.values())

View File

@@ -5,6 +5,7 @@ import datetime
# third-party imports
import celery
from hexathon import strip_0x
from sqlalchemy import or_
from sqlalchemy import not_
from sqlalchemy import tuple_
@@ -12,6 +13,7 @@ from sqlalchemy import func
# local imports
from cic_registry import CICRegistry
from cic_registry.chain import ChainSpec
from cic_eth.db.models.otx import Otx
from cic_eth.db.models.otx import OtxStateLog
from cic_eth.db.models.tx import TxCache
@@ -22,6 +24,7 @@ from cic_eth.db.enum import (
LockEnum,
StatusBits,
is_alive,
dead,
)
from cic_eth.eth.util import unpack_signed_raw_tx # TODO: should not be in same sub-path as package that imports queue.tx
from cic_eth.error import NotLocalTxError
@@ -687,5 +690,3 @@ def get_account_tx(address, as_sender=True, as_recipient=True, counterpart=None)
return txs