cic-stack/apps/data-seeding/cic_eth/traffic/traffic.py
2021-08-25 09:33:23 +00:00

132 lines
4.4 KiB
Python

# standard imports
import os
import logging
import re
import sys
import json
# external imports
import redis
import celery
from cic_eth_registry.registry import CICRegistry
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.head import HeadSyncer
from chainlib.eth.connection import EthHTTPConnection
from chainlib.chain import ChainSpec
from chainlib.eth.gas import RPCGasOracle
from chainlib.eth.nonce import RPCNonceOracle
from chainlib.eth.block import block_latest
from hexathon import strip_0x
import chainlib.eth.cli
import cic_eth.cli
from cic_eth.cli.chain import chain_interface
from chainlib.eth.constant import ZERO_ADDRESS
# local imports
#import common
from cmd.traffic import (
TrafficItem,
TrafficRouter,
TrafficProvisioner,
TrafficSyncHandler,
)
from cmd.traffic import add_args as add_traffic_args
from cmd.cache import (
AccountRegistryCache,
TokenRegistryCache,
)
# common basics
script_dir = os.path.dirname(os.path.realpath(__file__))
traffic_schema_dir = os.path.join(script_dir, 'data', 'config')
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
arg_flags = cic_eth.cli.argflag_std_read | cic_eth.cli.Flag.WALLET
local_arg_flags = cic_eth.cli.argflag_local_taskcallback | cic_eth.cli.argflag_local_chain
argparser = cic_eth.cli.ArgumentParser(arg_flags)
argparser.add_argument('--batch-size', default=10, type=int, help='number of events to process simultaneously')
argparser.process_local_flags(local_arg_flags)
args = argparser.parse_args()
extra_args = {
'batch_size': None,
}
config = cic_eth.cli.Config.from_args(args, arg_flags, local_arg_flags, base_config_dir=traffic_schema_dir, extra_args=extra_args)
wallet = chainlib.eth.cli.Wallet()
wallet.from_config(config)
rpc = chainlib.eth.cli.Rpc(wallet=wallet)
conn = rpc.connect_by_config(config)
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
class NetworkError(Exception):
pass
def main():
# create signer (not currently in use, but needs to be accessible for custom traffic item generators)
signer = rpc.get_signer()
signer_address = rpc.get_sender_address()
# connect to celery
celery.Celery(broker=config.get('CELERY_BROKER_URL'), backend=config.get('CELERY_RESULT_URL'))
# set up registry
CICRegistry.address = config.get('CIC_REGISTRY_ADDRESS')
registry = CICRegistry(chain_spec, conn)
# Connect to blockchain with chainlib
gas_oracle = RPCGasOracle(conn)
nonce_oracle = RPCNonceOracle(signer_address, conn)
# Set up magic traffic handler
traffic_router = TrafficRouter()
traffic_router.apply_import_dict(config.all(), config)
handler = TrafficSyncHandler(config, traffic_router, conn)
# Set up syncer
syncer_backend = MemBackend(config.get('CHAIN_SPEC'), 0)
o = block_latest()
r = conn.do(o)
block_offset = int(strip_0x(r), 16) + 1
syncer_backend.set(block_offset, 0)
# get relevant registry entries
token_registry = registry.lookup('TokenRegistry')
if token_registry == ZERO_ADDRESS:
raise NetworkError('TokenRegistry value missing from contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
logg.info('using token registry {}'.format(token_registry))
token_cache = TokenRegistryCache(chain_spec, token_registry)
account_registry = registry.lookup('AccountRegistry')
if account_registry == ZERO_ADDRESS:
raise NetworkError('AccountRegistry value missing from contract registry {}'.format(config.get('CIC_REGISTRY_ADDRESS')))
logg.info('using account registry {}'.format(account_registry))
account_cache = AccountRegistryCache(chain_spec, account_registry)
# Set up provisioner for common task input data
TrafficProvisioner.oracles['token'] = token_cache
TrafficProvisioner.oracles['account'] = account_cache
TrafficProvisioner.default_aux = {
'chain_spec': config.get('CHAIN_SPEC'),
'registry': registry,
'redis_host_callback': config.get('_REDIS_HOST_CALLBACK'),
'redis_port_callback': config.get('_REDIS_PORT_CALLBACK'),
'redis_db': config.get('REDIS_DB'),
'api_queue': config.get('CELERY_QUEUE'),
}
syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=handler.refresh)
syncer.add_filter(handler)
syncer.loop(1, conn)
if __name__ == '__main__':
main()