Add demurrage token task api

This commit is contained in:
nolash 2021-07-05 11:58:07 +02:00
parent 2f09ac9110
commit b5d6d80d8e
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 108 additions and 43 deletions

View File

@ -0,0 +1,30 @@
# standard imports
import logging
# external imports
import celery
from cic_eth.api.base import ApiBase
app = celery.current_app
logg = logging.getLogger(__name__)
class Api(ApiBase):
def get_adjusted_balance(self, token_symbol, balance, timestamp):
s = celery.signature(
'cic_eth_aux.erc20_demurrage_token.get_adjusted_balance',
[
token_symbol,
1000,
timestamp,
],
queue=None,
)
if self.callback_param != None:
s.link(self.callback_success)
s.link.on_error(self.callback_error)
t = s.apply_async(queue=self.queue)
return t

View File

@ -11,6 +11,7 @@ from cic_eth_aux.erc20_demurrage_token import (
DemurrageCalculationTask, DemurrageCalculationTask,
aux_setup, aux_setup,
) )
from cic_eth_aux.erc20_demurrage_token.api import Api as AuxApi
logg = logging.getLogger() logg = logging.getLogger()
@ -19,7 +20,7 @@ def test_demurrage_calulate_task(
default_chain_spec, default_chain_spec,
eth_rpc, eth_rpc,
cic_registry, cic_registry,
celery_worker, celery_session_worker,
register_demurrage_token, register_demurrage_token,
demurrage_token_symbol, demurrage_token_symbol,
contract_roles, contract_roles,
@ -45,3 +46,24 @@ def test_demurrage_calulate_task(
r = t.get_leaf() r = t.get_leaf()
assert t.successful() assert t.successful()
assert r == 980 assert r == 980
def test_demurrage_calculate_api(
default_chain_spec,
eth_rpc,
cic_registry,
celery_session_worker,
register_demurrage_token,
demurrage_token_symbol,
contract_roles,
load_config,
):
api = AuxApi(str(default_chain_spec), queue=None)
since = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
t = api.get_adjusted_balance(demurrage_token_symbol, 1000, since.timestamp())
r = t.get_leaf()
assert t.successful()
assert r == 980

View File

@ -12,6 +12,7 @@ from cic_eth_registry import CICRegistry
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
# local imports # local imports
from cic_eth.api.base import ApiBase
from cic_eth.db.enum import LockEnum from cic_eth.db.enum import LockEnum
app = celery.current_app app = celery.current_app
@ -19,48 +20,8 @@ app = celery.current_app
logg = logging.getLogger(__name__) logg = logging.getLogger(__name__)
class Api: class Api(ApiBase):
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)
def default_token(self): def default_token(self):
s_token = celery.signature( s_token = celery.signature(

View File

@ -0,0 +1,52 @@
# standard imports
import logging
# external imports
import celery
from chainlib.chain import ChainSpec
logg = logging.getLogger(__name__)
class ApiBase:
"""Creates task chains to perform well-known CIC operations.
Each method that sends tasks returns details about the root task. The root task uuid can be provided in the callback, to enable to caller to correlate the result with individual calls. It can also be used to independently poll the completion of a task chain.
:param callback_param: Static value to pass to callback
:type callback_param: str
:param callback_task: Callback task that executes callback_param call. (Must be included by the celery worker)
:type callback_task: string
:param queue: Name of worker queue to submit tasks to
:type queue: str
"""
def __init__(self, chain_str, queue='cic-eth', callback_param=None, callback_task='cic_eth.callbacks.noop.noop', callback_queue=None):
self.chain_str = chain_str
self.chain_spec = ChainSpec.from_chain_str(chain_str)
self.callback_param = callback_param
self.callback_task = callback_task
self.queue = queue
logg.debug('api using queue {}'.format(self.queue))
self.callback_success = None
self.callback_error = None
if callback_queue == None:
callback_queue=self.queue
if callback_param != None:
self.callback_success = celery.signature(
callback_task,
[
callback_param,
0,
],
queue=callback_queue,
)
self.callback_error = celery.signature(
callback_task,
[
callback_param,
1,
],
queue=callback_queue,
)