# standard imports import sys import os import logging import argparse # external imports import confini from chainsyncer.backend.memory import MemBackend from chainsyncer.driver import ( HeadSyncer, HistorySyncer, ) from chainsyncer.filter import SyncFilter from chainsyncer.error import NoBlockForYou from chainlib.chain import ChainSpec from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.block import block_latest # local imports from eth_stat_syncer.store import ( GasAggregator, RunStore, ) logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() default_config_dir = os.environ.get('CONFINI_DIR', './config') argparser = argparse.ArgumentParser() argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') argparser.add_argument('--start', type=int, help='number of blocks to sample at startup') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose') args = argparser.parse_args() if args.vv: logging.getLogger().setLevel(logging.DEBUG) elif args.v: logging.getLogger().setLevel(logging.INFO) config = confini.Config(args.c, args.env_prefix) config.process() # override args args_override = { 'CHAIN_SPEC': getattr(args, 'i'), 'RPC_PROVIDER': getattr(args, 'p'), } config.dict_override(args_override, 'cli flag') config.add(args.start, '_START', True) logg.debug('loaded config: {}\n'.format(config)) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) conn = EthHTTPConnection(args.p) class GasPriceFilter(SyncFilter): def __init__(self, chain_spec, gas_aggregator): self.chain_spec = chain_spec self.gas_aggregator = gas_aggregator def filter(self, conn, block, tx, db_session): self.gas_aggregator.put(tx.gas_price) def main(): gas_store = RunStore(basedir=config.get('STORE_BASE_DIR')) gas_aggregator = GasAggregator(gas_store, 360) gas_filter = GasPriceFilter(chain_spec, gas_aggregator) o = block_latest() r = conn.do(o) n = int(r, 16) start_block = n logg.info('block height at start {}'.format(start_block)) if config.get('_START') != None: offset = start_block - config.get('_START') syncer_backend = MemBackend(chain_spec, None, target_block=start_block) syncer_backend.set(offset, 0) syncer = HistorySyncer(syncer_backend, block_callback=gas_aggregator.block_callback) syncer.add_filter(gas_filter) try: syncer.loop(0.0, conn) except NoBlockForYou: logg.info('history done at {}'.format(syncer.backend.get())) syncer_backend = MemBackend(chain_spec, None) syncer_backend.set(start_block + 1, 0) syncer = HeadSyncer(syncer_backend, block_callback=gas_aggregator.block_callback) syncer.add_filter(gas_filter) syncer.loop(1.0, conn) if __name__ == '__main__': main()