From 3bf92e7a8a02944bb634f0f35cb9d1fd2507353c Mon Sep 17 00:00:00 2001 From: Louis Holbrook Date: Wed, 25 Aug 2021 09:33:23 +0000 Subject: [PATCH] Implement chainlib cli for traffic script --- .../cic_eth/runnable/daemons/dispatcher.py | 4 +- .../cic_eth/traffic/cmd/traffic.py | 7 +- .../cic_eth/traffic/config/celery.ini | 3 + .../cic_eth/traffic/config/redis.ini | 3 + .../cic_eth/traffic/config/rpc.ini | 2 + .../cic_eth/traffic/data/config/traffic.ini | 4 ++ apps/data-seeding/cic_eth/traffic/traffic.py | 71 ++++++++++--------- apps/data-seeding/config/traffic.ini | 4 -- apps/data-seeding/requirements.txt | 4 +- 9 files changed, 57 insertions(+), 45 deletions(-) create mode 100644 apps/data-seeding/cic_eth/traffic/config/celery.ini create mode 100644 apps/data-seeding/cic_eth/traffic/config/redis.ini create mode 100644 apps/data-seeding/cic_eth/traffic/config/rpc.ini create mode 100644 apps/data-seeding/cic_eth/traffic/data/config/traffic.ini delete mode 100644 apps/data-seeding/config/traffic.ini diff --git a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py index 64b51937..e904cacd 100644 --- a/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py +++ b/apps/cic-eth/cic_eth/runnable/daemons/dispatcher.py @@ -101,14 +101,14 @@ class DispatchSyncer: LockEnum.QUEUE, tx['from'], ], - queue=queue, + queue=config.get('CELERY_QUEUE'), ) s_send = celery.signature( 'cic_eth.eth.tx.send', [ self.chain_spec.asdict(), ], - queue=queue, + queue=config.get('CELERY_QUEUE'), ) s_check.link(s_send) t = s_check.apply_async() diff --git a/apps/data-seeding/cic_eth/traffic/cmd/traffic.py b/apps/data-seeding/cic_eth/traffic/cmd/traffic.py index 11893240..f8e456df 100644 --- a/apps/data-seeding/cic_eth/traffic/cmd/traffic.py +++ b/apps/data-seeding/cic_eth/traffic/cmd/traffic.py @@ -140,8 +140,11 @@ class TrafficRouter: for k in keys: if len(k) > 8 and k[:8] == 'TRAFFIC_': v = int(dct.get(k)) - self.add(k[8:].lower(), v) - logg.debug('found traffic item {} weight {}'.format(k, v)) + if v == 0: + logg.debug('skipping traffic item {} with weight {}'.format(k, v)) + else: + logg.debug('found traffic item {} weight {}'.format(k, v)) + self.add(k[8:].lower(), v) # TODO: This will not work well with big networks. The provisioner should use lazy loading and LRU instead. diff --git a/apps/data-seeding/cic_eth/traffic/config/celery.ini b/apps/data-seeding/cic_eth/traffic/config/celery.ini new file mode 100644 index 00000000..98c5012f --- /dev/null +++ b/apps/data-seeding/cic_eth/traffic/config/celery.ini @@ -0,0 +1,3 @@ +[celery] +broker_url = redis://localhost:63379 +result_url = redis://localhost:63379 diff --git a/apps/data-seeding/cic_eth/traffic/config/redis.ini b/apps/data-seeding/cic_eth/traffic/config/redis.ini new file mode 100644 index 00000000..e45d7ab2 --- /dev/null +++ b/apps/data-seeding/cic_eth/traffic/config/redis.ini @@ -0,0 +1,3 @@ +[redis] +host = localhost +port = 63379 diff --git a/apps/data-seeding/cic_eth/traffic/config/rpc.ini b/apps/data-seeding/cic_eth/traffic/config/rpc.ini new file mode 100644 index 00000000..d765756f --- /dev/null +++ b/apps/data-seeding/cic_eth/traffic/config/rpc.ini @@ -0,0 +1,2 @@ +[rpc] +http_provider = http://localhost:63545 diff --git a/apps/data-seeding/cic_eth/traffic/data/config/traffic.ini b/apps/data-seeding/cic_eth/traffic/data/config/traffic.ini new file mode 100644 index 00000000..68aa5ac7 --- /dev/null +++ b/apps/data-seeding/cic_eth/traffic/data/config/traffic.ini @@ -0,0 +1,4 @@ +[traffic] +#local.noop_traffic = 1 +local.account = 2 +local.transfer = 2 diff --git a/apps/data-seeding/cic_eth/traffic/traffic.py b/apps/data-seeding/cic_eth/traffic/traffic.py index 72a4a93c..03f4cae5 100644 --- a/apps/data-seeding/cic_eth/traffic/traffic.py +++ b/apps/data-seeding/cic_eth/traffic/traffic.py @@ -17,14 +17,10 @@ from chainlib.eth.gas import RPCGasOracle from chainlib.eth.nonce import RPCNonceOracle from chainlib.eth.block import block_latest from hexathon import strip_0x -from cic_base import ( - argparse, - config, - log, - rpc, - signer as signer_funcs, - ) -from cic_base.eth.syncer import chain_interface +import chainlib.eth.cli +import cic_eth.cli +from cic_eth.cli.chain import chain_interface +from chainlib.eth.constant import ZERO_ADDRESS # local imports #import common @@ -42,42 +38,45 @@ from cmd.cache import ( # common basics -script_dir = os.path.realpath(os.path.dirname(__file__)) -logg = log.create() -argparser = argparse.create(script_dir, argparse.full_template) -argparser = argparse.add(argparser, add_traffic_args, 'traffic') -args = argparse.parse(argparser, logg) -config = config.create(args.c, args, args.env_prefix) +script_dir = os.path.dirname(os.path.realpath(__file__)) +traffic_schema_dir = os.path.join(script_dir, 'data', 'config') +logging.basicConfig(level=logging.WARNING) +logg = logging.getLogger() -# map custom args to local config entries -batchsize = args.batch_size -if batchsize < 1: - batchsize = 1 -logg.info('batch size {}'.format(batchsize)) -config.add(batchsize, '_BATCH_SIZE', True) +arg_flags = cic_eth.cli.argflag_std_read | cic_eth.cli.Flag.WALLET +local_arg_flags = cic_eth.cli.argflag_local_taskcallback | cic_eth.cli.argflag_local_chain +argparser = cic_eth.cli.ArgumentParser(arg_flags) +argparser.add_argument('--batch-size', default=10, type=int, help='number of events to process simultaneously') +argparser.process_local_flags(local_arg_flags) +args = argparser.parse_args() -config.add(args.redis_host_callback, '_REDIS_HOST_CALLBACK', True) -config.add(args.redis_port_callback, '_REDIS_PORT_CALLBACK', True) +extra_args = { + 'batch_size': None, + } +config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, base_config_dir=traffic_schema_dir, extra_args=extra_args) -config.add(args.y, '_KEYSTORE_FILE', True) +wallet = chainlib.eth.cli.Wallet() +wallet.from_config(config) -config.add(args.q, '_CELERY_QUEUE', True) +rpc = chainlib.eth.cli.Rpc(wallet=wallet) +conn = rpc.connect_by_config(config) -logg.debug(config) +chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) + + +class NetworkError(Exception): + pass -chain_spec = ChainSpec.from_chain_str(config.get('CIC_CHAIN_SPEC')) def main(): # create signer (not currently in use, but needs to be accessible for custom traffic item generators) - (signer_address, signer) = signer_funcs.from_keystore(config.get('_KEYSTORE_FILE')) + signer = rpc.get_signer() + signer_address = rpc.get_sender_address() # connect to celery celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL')) # set up registry - rpc.setup(config.get('CIC_CHAIN_SPEC'), config.get('ETH_PROVIDER')) # replace with HTTPConnection when registry has been so refactored - conn = EthHTTPConnection(config.get('ETH_PROVIDER')) - #registry = registry.init_legacy(config, w3) CICRegistry.address = config.get('CIC_REGISTRY_ADDRESS') registry = CICRegistry(chain_spec, conn) @@ -91,7 +90,7 @@ def main(): handler = TrafficSyncHandler(config, traffic_router, conn) # Set up syncer - syncer_backend = MemBackend(config.get('CIC_CHAIN_SPEC'), 0) + syncer_backend = MemBackend(config.get('CHAIN_SPEC'), 0) o = block_latest() r = conn.do(o) block_offset = int(strip_0x(r), 16) + 1 @@ -99,26 +98,28 @@ def main(): # get relevant registry entries token_registry = registry.lookup('TokenRegistry') + if token_registry == ZERO_ADDRESS: + raise NetworkError('TokenRegistry value missing from contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS'))) logg.info('using token registry {}'.format(token_registry)) token_cache = TokenRegistryCache(chain_spec, token_registry) account_registry = registry.lookup('AccountRegistry') + if account_registry == ZERO_ADDRESS: + raise NetworkError('AccountRegistry value missing from contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS'))) logg.info('using account registry {}'.format(account_registry)) account_cache = AccountRegistryCache(chain_spec, account_registry) # Set up provisioner for common task input data - #TrafficProvisioner.oracles['token']= common.registry.TokenOracle(w3, config.get('CIC_CHAIN_SPEC'), registry) - #TrafficProvisioner.oracles['account'] = common.registry.AccountsOracle(w3, config.get('CIC_CHAIN_SPEC'), registry) TrafficProvisioner.oracles['token'] = token_cache TrafficProvisioner.oracles['account'] = account_cache TrafficProvisioner.default_aux = { - 'chain_spec': config.get('CIC_CHAIN_SPEC'), + 'chain_spec': config.get('CHAIN_SPEC'), 'registry': registry, 'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'), 'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'), 'redis_db': config.get('REDIS_DB'), - 'api_queue': config.get('_CELERY_QUEUE'), + 'api_queue': config.get('CELERY_QUEUE'), } syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=handler.refresh) diff --git a/apps/data-seeding/config/traffic.ini b/apps/data-seeding/config/traffic.ini deleted file mode 100644 index df89d1aa..00000000 --- a/apps/data-seeding/config/traffic.ini +++ /dev/null @@ -1,4 +0,0 @@ -[traffic] -#local.noop_traffic = 2 -#local.account = 2 -local.transfer = 2 diff --git a/apps/data-seeding/requirements.txt b/apps/data-seeding/requirements.txt index b4aad2ee..90c67b82 100644 --- a/apps/data-seeding/requirements.txt +++ b/apps/data-seeding/requirements.txt @@ -4,10 +4,10 @@ cic-types~=0.1.0a14 crypto-dev-signer>=0.4.15a1,<=0.4.15 faker==4.17.1 chainsyncer~=0.0.6a1 -chainlib-eth~=0.0.9a3 +chainlib-eth~=0.0.9a4 eth-address-index~=0.2.3a4 eth-contract-registry~=0.6.3a3 -eth-accounts-index~=0.1.2a2 +eth-accounts-index~=0.1.2a3 eth-erc20~=0.1.2a2 erc20-faucet~=0.3.2a2 psycopg2==2.8.6