1e7fff0133
- Renames s_assemble to s_brief - Link s_local to s_brief
91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
# standard imports
|
|
import json
|
|
import logging
|
|
from typing import Union
|
|
|
|
# third-party imports
|
|
import celery
|
|
from cic_eth.api import Api
|
|
|
|
# local imports
|
|
from cic_ussd.error import CachedDataNotFoundError
|
|
from cic_ussd.redis import create_cached_data_key, get_cached_data
|
|
from cic_ussd.conversions import from_wei
|
|
|
|
logg = logging.getLogger()
|
|
|
|
|
|
class BalanceManager:
|
|
|
|
def __init__(self, address: str, chain_str: str, token_symbol: str):
|
|
"""
|
|
:param address: Ethereum address of account whose balance is being queried
|
|
:type address: str, 0x-hex
|
|
:param chain_str: The chain name and network id.
|
|
:type chain_str: str
|
|
:param token_symbol: ERC20 token symbol of whose balance is being queried
|
|
:type token_symbol: str
|
|
"""
|
|
self.address = address
|
|
self.chain_str = chain_str
|
|
self.token_symbol = token_symbol
|
|
|
|
def get_balances(self, asynchronous: bool = False) -> Union[celery.Task, dict]:
|
|
"""
|
|
This function queries cic-eth for an account's balances, It provides a means to receive the balance either
|
|
asynchronously or synchronously depending on the provided value for teh asynchronous parameter. It returns a
|
|
dictionary containing network, outgoing and incoming balances.
|
|
:param asynchronous: Boolean value checking whether to return balances asynchronously
|
|
:type asynchronous: bool
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
if asynchronous:
|
|
cic_eth_api = Api(
|
|
chain_str=self.chain_str,
|
|
callback_queue='cic-ussd',
|
|
callback_task='cic_ussd.tasks.callback_handler.process_balances_callback',
|
|
callback_param=''
|
|
)
|
|
cic_eth_api.balance(address=self.address, token_symbol=self.token_symbol)
|
|
else:
|
|
cic_eth_api = Api(chain_str=self.chain_str)
|
|
balance_request_task = cic_eth_api.balance(
|
|
address=self.address,
|
|
token_symbol=self.token_symbol)
|
|
return balance_request_task.get()[0]
|
|
|
|
|
|
def compute_operational_balance(balances: dict) -> float:
|
|
"""This function calculates the right balance given incoming and outgoing
|
|
:param balances:
|
|
:type balances:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
incoming_balance = balances.get('balance_incoming')
|
|
outgoing_balance = balances.get('balance_outgoing')
|
|
network_balance = balances.get('balance_network')
|
|
|
|
operational_balance = (network_balance + incoming_balance) - outgoing_balance
|
|
return from_wei(value=operational_balance)
|
|
|
|
|
|
def get_cached_operational_balance(blockchain_address: str):
|
|
"""
|
|
:param blockchain_address:
|
|
:type blockchain_address:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
key = create_cached_data_key(
|
|
identifier=bytes.fromhex(blockchain_address[2:]),
|
|
salt='cic.balances_data'
|
|
)
|
|
cached_balance = get_cached_data(key=key)
|
|
if cached_balance:
|
|
operational_balance = compute_operational_balance(balances=json.loads(cached_balance))
|
|
return operational_balance
|
|
else:
|
|
raise CachedDataNotFoundError('Cached operational balance not found.')
|