Make account setter use celery

This commit is contained in:
nolash 2021-03-24 17:57:18 +01:00
parent e64b1bf984
commit b93aa82bf7
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
4 changed files with 41 additions and 39 deletions

View File

@ -39,9 +39,8 @@ class AdminApi:
:param queue: Name of worker queue to submit tasks to :param queue: Name of worker queue to submit tasks to
:type queue: str :type queue: str
""" """
def __init__(self, rpc_client, queue='cic-eth'): def __init__(self, rpc, queue='cic-eth'):
self.rpc_client = rpc_client self.rpc = rpc
self.w3 = rpc_client.w3
self.queue = queue self.queue = queue
@ -80,7 +79,7 @@ class AdminApi:
return s_lock.apply_async().get() return s_lock.apply_async().get()
def tag_account(self, tag, address_hex): def tag_account(self, tag, address_hex, chain_spec):
"""Persistently associate an address with a plaintext tag. """Persistently associate an address with a plaintext tag.
Some tags are known by the system and is used to resolve addresses to use for certain transactions. Some tags are known by the system and is used to resolve addresses to use for certain transactions.
@ -91,14 +90,18 @@ class AdminApi:
:type address_hex: str, 0x-hex :type address_hex: str, 0x-hex
:raises ValueError: Invalid checksum address :raises ValueError: Invalid checksum address
""" """
#if not web3.Web3.isChecksumAddress(address_hex): s_tag = celery.signature(
if not to_checksum_address(address_hex): 'cic_eth.eth.account.set_role',
raise ValueError('invalid address') [
session = SessionBase.create_session() tag,
role = AccountRole.set(tag, address_hex) address_hex,
session.add(role) chain_spec.asdict(),
session.commit() ],
session.close() queue=self.queue,
)
t = s_tag.apply_async()
logg.debug('taaag {}'.format(t))
return t.get()
def have_account(self, address_hex, chain_str): def have_account(self, address_hex, chain_str):

View File

@ -230,10 +230,23 @@ def have(self, account, chain_spec_dict):
logg.debug('cannot sign with {}: {}'.format(account, e)) logg.debug('cannot sign with {}: {}'.format(account, e))
conn.disconnect() conn.disconnect()
return None return None
@celery_app.task(bind=True, base=CriticalSQLAlchemyTask)
def set_role(self, tag, address, chain_spec_dict):
logg.debug('foo fooofoo')
if not to_checksum_address(address):
raise ValueError('invalid checksum address {}'.format(address))
session = SessionBase.create_session()
role = AccountRole.set(tag, address, session=session)
session.add(role)
session.commit()
session.close()
return tag
@celery_app.task(bind=True, base=BaseTask) @celery_app.task(bind=True, base=BaseTask)
def role(self, account, chain_str): def role(self, address, chain_spec_dict):
"""Return account role for address """Return account role for address
:param account: Account to check :param account: Account to check
@ -244,7 +257,7 @@ def role(self, account, chain_str):
:rtype: Varies :rtype: Varies
""" """
session = self.create_session() session = self.create_session()
role_tag = AccountRole.role_for(account, session=session) role_tag = AccountRole.role_for(address, session=session)
session.close() session.close()
return role_tag return role_tag

View File

@ -7,9 +7,9 @@ import json
import argparse import argparse
# external imports # external imports
import celery
import confini import confini
import redis import redis
import celery
from xdg.BaseDirectory import xdg_config_home from xdg.BaseDirectory import xdg_config_home
# local imports # local imports
@ -52,6 +52,7 @@ args_override = {
'REDIS_DB': getattr(args, 'redis_db'), 'REDIS_DB': getattr(args, 'redis_db'),
} }
config.dict_override(args_override, 'cli') config.dict_override(args_override, 'cli')
celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))

View File

@ -5,14 +5,14 @@ import logging
import argparse import argparse
import re import re
# third-party imports # external imports
import web3 import celery
from web3 import HTTPProvider, WebsocketProvider
import confini import confini
from chainlib.chain import ChainSpec
from xdg.BaseDirectory import xdg_config_home
# local imports # local imports
from cic_eth.api import AdminApi from cic_eth.api import AdminApi
from cic_eth.eth import RpcClient
from cic_eth.db import dsn_from_config from cic_eth.db import dsn_from_config
from cic_eth.db.models.base import SessionBase from cic_eth.db.models.base import SessionBase
@ -48,29 +48,14 @@ config.censor('PASSWORD', 'DATABASE')
config.censor('PASSWORD', 'SSL') config.censor('PASSWORD', 'SSL')
logg.debug('config loaded from {}\n{}'.format(args.c, config)) logg.debug('config loaded from {}\n{}'.format(args.c, config))
chain_spec = ChainSpec.from_chain_str(args.i)
dsn = dsn_from_config(config) celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
SessionBase.connect(dsn)
re_websocket = re.compile('^wss?://')
re_http = re.compile('^https?://')
blockchain_provider = config.get('ETH_PROVIDER')
if re.match(re_websocket, blockchain_provider) != None:
blockchain_provider = WebsocketProvider(blockchain_provider)
elif re.match(re_http, blockchain_provider) != None:
blockchain_provider = HTTPProvider(blockchain_provider)
else:
raise ValueError('unknown provider url {}'.format(blockchain_provider))
def web3_constructor():
w3 = web3.Web3(blockchain_provider)
return (blockchain_provider, w3)
RpcClient.set_constructor(web3_constructor)
c = RpcClient(config.get('CIC_CHAIN_SPEC'))
def main(): def main():
api = AdminApi(c) api = AdminApi(None)
api.tag_account(args.tag, args.address) api.tag_account(args.tag, args.address, chain_spec)
if __name__ == '__main__': if __name__ == '__main__':