diff --git a/apps/cic-eth/cic_eth/api/api_admin.py b/apps/cic-eth/cic_eth/api/api_admin.py index f7c5fb8c..eb08934f 100644 --- a/apps/cic-eth/cic_eth/api/api_admin.py +++ b/apps/cic-eth/cic_eth/api/api_admin.py @@ -39,9 +39,8 @@ class AdminApi: :param queue: Name of worker queue to submit tasks to :type queue: str """ - def __init__(self, rpc_client, queue='cic-eth'): - self.rpc_client = rpc_client - self.w3 = rpc_client.w3 + def __init__(self, rpc, queue='cic-eth'): + self.rpc = rpc self.queue = queue @@ -80,7 +79,7 @@ class AdminApi: return s_lock.apply_async().get() - def tag_account(self, tag, address_hex): + def tag_account(self, tag, address_hex, chain_spec): """Persistently associate an address with a plaintext tag. Some tags are known by the system and is used to resolve addresses to use for certain transactions. @@ -91,14 +90,18 @@ class AdminApi: :type address_hex: str, 0x-hex :raises ValueError: Invalid checksum address """ - #if not web3.Web3.isChecksumAddress(address_hex): - if not to_checksum_address(address_hex): - raise ValueError('invalid address') - session = SessionBase.create_session() - role = AccountRole.set(tag, address_hex) - session.add(role) - session.commit() - session.close() + s_tag = celery.signature( + 'cic_eth.eth.account.set_role', + [ + tag, + address_hex, + chain_spec.asdict(), + ], + queue=self.queue, + ) + t = s_tag.apply_async() + logg.debug('taaag {}'.format(t)) + return t.get() def have_account(self, address_hex, chain_str): diff --git a/apps/cic-eth/cic_eth/eth/account.py b/apps/cic-eth/cic_eth/eth/account.py index 23073087..1242898b 100644 --- a/apps/cic-eth/cic_eth/eth/account.py +++ b/apps/cic-eth/cic_eth/eth/account.py @@ -230,10 +230,23 @@ def have(self, account, chain_spec_dict): logg.debug('cannot sign with {}: {}'.format(account, e)) conn.disconnect() return None - + + +@celery_app.task(bind=True, base=CriticalSQLAlchemyTask) +def set_role(self, tag, address, chain_spec_dict): + logg.debug('foo fooofoo') + if not to_checksum_address(address): + raise ValueError('invalid checksum address {}'.format(address)) + session = SessionBase.create_session() + role = AccountRole.set(tag, address, session=session) + session.add(role) + session.commit() + session.close() + return tag + @celery_app.task(bind=True, base=BaseTask) -def role(self, account, chain_str): +def role(self, address, chain_spec_dict): """Return account role for address :param account: Account to check @@ -244,7 +257,7 @@ def role(self, account, chain_str): :rtype: Varies """ session = self.create_session() - role_tag = AccountRole.role_for(account, session=session) + role_tag = AccountRole.role_for(address, session=session) session.close() return role_tag diff --git a/apps/cic-eth/cic_eth/runnable/create.py b/apps/cic-eth/cic_eth/runnable/create.py index 4c2b19cb..66ba619f 100644 --- a/apps/cic-eth/cic_eth/runnable/create.py +++ b/apps/cic-eth/cic_eth/runnable/create.py @@ -7,9 +7,9 @@ import json import argparse # external imports +import celery import confini import redis -import celery from xdg.BaseDirectory import xdg_config_home # local imports @@ -52,6 +52,7 @@ args_override = { 'REDIS_DB': getattr(args, 'redis_db'), } config.dict_override(args_override, 'cli') + celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) diff --git a/apps/cic-eth/cic_eth/runnable/tag.py b/apps/cic-eth/cic_eth/runnable/tag.py index c1df1e30..ea171303 100644 --- a/apps/cic-eth/cic_eth/runnable/tag.py +++ b/apps/cic-eth/cic_eth/runnable/tag.py @@ -5,14 +5,14 @@ import logging import argparse import re -# third-party imports -import web3 -from web3 import HTTPProvider, WebsocketProvider +# external imports +import celery import confini +from chainlib.chain import ChainSpec +from xdg.BaseDirectory import xdg_config_home # local imports from cic_eth.api import AdminApi -from cic_eth.eth import RpcClient from cic_eth.db import dsn_from_config from cic_eth.db.models.base import SessionBase @@ -48,29 +48,14 @@ config.censor('PASSWORD', 'DATABASE') config.censor('PASSWORD', 'SSL') logg.debug('config loaded from {}\n{}'.format(args.c, config)) +chain_spec = ChainSpec.from_chain_str(args.i) -dsn = dsn_from_config(config) -SessionBase.connect(dsn) +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) -re_websocket = re.compile('^wss?://') -re_http = re.compile('^https?://') -blockchain_provider = config.get('ETH_PROVIDER') -if re.match(re_websocket, blockchain_provider) != None: - blockchain_provider = WebsocketProvider(blockchain_provider) -elif re.match(re_http, blockchain_provider) != None: - blockchain_provider = HTTPProvider(blockchain_provider) -else: - raise ValueError('unknown provider url {}'.format(blockchain_provider)) - -def web3_constructor(): - w3 = web3.Web3(blockchain_provider) - return (blockchain_provider, w3) -RpcClient.set_constructor(web3_constructor) -c = RpcClient(config.get('CIC_CHAIN_SPEC')) def main(): - api = AdminApi(c) - api.tag_account(args.tag, args.address) + api = AdminApi(None) + api.tag_account(args.tag, args.address, chain_spec) if __name__ == '__main__':