diff --git a/apps/contract-migration/scripts/config/traffic.ini b/apps/contract-migration/scripts/config/traffic.ini index a95bf36d..27035a60 100644 --- a/apps/contract-migration/scripts/config/traffic.ini +++ b/apps/contract-migration/scripts/config/traffic.ini @@ -1,2 +1,3 @@ [traffic] -local.noop_traffic = 2 +#local.noop_traffic = 2 +local.account = 2 diff --git a/apps/contract-migration/scripts/local/account.py b/apps/contract-migration/scripts/local/account.py new file mode 100644 index 00000000..813c776c --- /dev/null +++ b/apps/contract-migration/scripts/local/account.py @@ -0,0 +1,25 @@ +# standard imports +import logging + +# external imports +from cic_eth.api.api_task import Api + +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() + +queue = 'cic-eth' +name = 'account' + + +def do(tokens, accounts, aux, block_number, tx_index): + + logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) + api = Api( + str(aux['chain_spec']), + queue=queue, + callback_param='{}:{}:{}:{}'.format(aux['redis_host_callback'], aux['redis_port_callback'], aux['redis_db'], aux['redis_channel']), + callback_task='cic_eth.callbacks.redis.redis', + callback_queue=queue, + ) + t = api.create_account(register=True) + return (None, t,) diff --git a/apps/contract-migration/scripts/local/noop_traffic.py b/apps/contract-migration/scripts/local/noop_traffic.py index 7a7db240..961f807b 100644 --- a/apps/contract-migration/scripts/local/noop_traffic.py +++ b/apps/contract-migration/scripts/local/noop_traffic.py @@ -5,5 +5,7 @@ logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -def do(config, tokens, accounts, block_number, tx_index): +def do(tokens, accounts, aux, block_number, tx_index): logg.debug('running {} {} {}'.format(__name__, tokens, accounts)) + + return (None, None,) diff --git a/apps/contract-migration/scripts/traffic.py b/apps/contract-migration/scripts/traffic.py index 2ed1eede..5c9f1dae 100644 --- a/apps/contract-migration/scripts/traffic.py +++ b/apps/contract-migration/scripts/traffic.py @@ -13,6 +13,7 @@ import random import redis import confini import web3 +import celery from cic_registry import CICRegistry from cic_registry.chain import ChainRegistry from chainlib.chain import ChainSpec @@ -42,15 +43,17 @@ default_data_dir = '/usr/local/share/cic/solidity/abi' argparser = argparse.ArgumentParser() argparser.add_argument('-p', type=str, help='Ethereum provider url') +argparser.add_argument('-r', type=str, help='cic-registry address') argparser.add_argument('-y', '--key-file', dest='y', type=str, help='Ethereum keystore file to use for signing') argparser.add_argument('-c', type=str, default='./config', help='config file') +argparser.add_argument('-q', type=str, default='cic-eth', help='celery queue to submit to') argparser.add_argument('-i', '--chain-spec', dest='i', type=str, help='chain spec') argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose') argparser.add_argument('--abi-dir', dest='abi_dir', type=str, help='Directory containing bytecode and abi') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') - -argparser.add_argument('-r', type=str, help='cic-registry address') +argparser.add_argument('--redis-host-callback', dest='redis_host_callback', default='localhost', type=str, help='redis host to use for callback') +argparser.add_argument('--redis-port-callback', dest='redis_port_callback', default=6379, type=int, help='redis port to use for callback') argparser.add_argument('--batch-size', dest='batch_size', default=10, type=int, help='number of events to process simultaneously') args = argparser.parse_args() @@ -80,6 +83,10 @@ if batchsize < 1: logg.info('batch size {}'.format(batchsize)) config.add(batchsize, '_BATCH_SIZE', True) +# redis task result callback +config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True) +config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True) + # signer keystore = DictKeystore() if args.y == None: @@ -106,12 +113,15 @@ elif re.match(re_http, config.get('ETH_PROVIDER')): blockchain_provider = web3.Web3.HTTPProvider(config.get('ETH_PROVIDER')) w3 = web3.Web3(blockchain_provider) +# connect celery +celery_app = celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) class TrafficItem: def __init__(self, item): self.method = item.do self.uuid = uuid.uuid4() + self.ext = None self.complete = False @@ -151,8 +161,8 @@ class TrafficRouter: return ti - def release(self, k): - del self.reserved[k] + def release(self, traffic_item): + del self.reserved[traffic_item.uuid] # parse traffic items @@ -173,6 +183,9 @@ class TrafficTasker: 'account': None, 'token': None, } + default_aux = { + } + def __init__(self): @@ -180,7 +193,7 @@ class TrafficTasker: self.accounts = self.oracles['account'].get_accounts() self.balances = {} - self.aux = {} + self.aux = copy.copy(self.default_aux) def load_balances(self): @@ -200,6 +213,69 @@ class TrafficTasker: +class Handler: + + def __init__(self, config, traffic_router): + self.traffic_router = traffic_router + self.redis_channel = str(uuid.uuid4()) + self.pubsub = self.__connect_redis(self.redis_channel, config) + self.traffic_items = {} + self.config = config + + + def __connect_redis(self, redis_channel, config): + r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB')) + redis_pubsub = r.pubsub() + redis_pubsub.subscribe(redis_channel) + logg.debug('redis connected on channel {}'.format(redis_channel)) + return redis_pubsub + + + def refresh(self, block_number, tx_index): + + traffic_tasker = TrafficTasker() + traffic_tasker.add_aux('redis_channel', self.redis_channel) + + if len(traffic_tasker.tokens) == 0: + logg.error('patiently waiting for at least one registered token...') + return + + logg.debug('executing handler refresh with accouts {}'.format(traffic_tasker.accounts)) + logg.debug('executing handler refresh with tokens {}'.format(traffic_tasker.tokens)) + + while True: + traffic_item = traffic_router.reserve() + if traffic_item == None: + logg.debug('no traffic_items left to reserve {}'.format(traffic_item)) + break + (e, t,) = traffic_item.method(traffic_tasker.tokens, traffic_tasker.accounts, traffic_tasker.aux, block_number, tx_index) + if e != None: + logg.info('traffic method {} failed: {}'.format(traffic_item.name(), e)) + continue + if t == None: + logg.info('traffic method {} completed immediately') + self.traffic_router.release(traffic_item) + traffic_item.ext = t + self.traffic_items[traffic_item.uuid] = traffic_item + + + # TODO: add drain + while True: + m = self.pubsub.get_message(timeout=0.01) + if m == None: + break + logg.debug('redis message {}'.format(m)) + + + def name(self): + return 'traffic_item_handler' + + + def filter(self, conn, block, tx, session): + logg.debug('handler get {}'.format(tx)) + + + class TokenOracle: def __init__(self, chain_spec, registry): @@ -251,52 +327,6 @@ class AccountsOracle: return copy.copy(self.accounts) -class Handler: - - def __init__(self, config, chain_spec, registry, traffic_router): - self.traffic_router = traffic_router - self.redis_channel = str(uuid.uuid4()) - self.pubsub = self.__connect_redis(self.redis_channel, config) - - - def __connect_redis(self, redis_channel, config): - r = redis.Redis(config.get('REDIS_HOST'), config.get('REDIS_PORT'), config.get('REDIS_DB')) - redis_pubsub = r.pubsub() - redis_pubsub.subscribe(redis_channel) - logg.debug('redis connected on channel {}'.format(redis_channel)) - return redis_pubsub - - - def refresh(self, block_number, tx_index): - - token_tasker = TrafficTasker() - - if len(token_tasker.tokens) == 0: - logg.error('no tokens yet') - return - - while True: - item = traffic_router.reserve() - if item == None: - logg.debug('no items left to reserve {}'.format(item)) - break - item.method(config, token_tasker.tokens, token_tasker.accounts, block_number, tx_index) - - # TODO: add drain - while True: - m = self.pubsub.get_message(timeout=0.01) - if m == None: - break - - - def name(self): - return 'traffic_item_handler' - - - def filter(self, conn, block, tx, session): - logg.debug('handler get {}'.format(tx)) - - def main(local_config=None): if local_config != None: @@ -327,7 +357,7 @@ def main(local_config=None): nonce_oracle = DefaultNonceOracle(config.get('_SIGNER_ADDRESS'), conn) # Set up magic traffic handler - handler = Handler(config, chain_spec, CICRegistry, traffic_router) + handler = Handler(config, traffic_router) # Set up syncer @@ -337,9 +367,15 @@ def main(local_config=None): block_offset = int(strip_0x(r), 16) + 1 syncer_backend.set(block_offset, 0) - TrafficTasker.oracles['token']= TokenOracle(chain_spec, CICRegistry) TrafficTasker.oracles['account'] = AccountsOracle(chain_spec, CICRegistry) + TrafficTasker.default_aux = { + 'chain_spec': chain_spec, + 'registry': CICRegistry, + 'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'), + 'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'), + 'redis_db': config.get('REDIS_DB'), + } syncer = HeadSyncer(syncer_backend, loop_callback=handler.refresh) syncer.add_filter(handler) diff --git a/docker-compose.yml b/docker-compose.yml index 51695624..ac85dea7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -237,7 +237,7 @@ services: - -c - | if [[ -f /tmp/cic/config/.env ]]; then source /tmp/cic/config/.env; fi - ./start_tasker.sh -q cic-eth -v + ./start_tasker.sh -q cic-eth -vv # command: [/bin/sh, "./start_tasker.sh", -q, cic-eth, -vv ] cic-eth-manager-head: