eth-stat-syncer/eth_stat_syncer/runnable/tracker.py

122 lines
3.8 KiB
Python

# standard imports
import sys
import os
import logging
import argparse
# external imports
import confini
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.filter import SyncFilter
from chainsyncer.error import NoBlockForYou
from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection
from chainlib.interface import ChainInterface
from chainlib.eth.block import (
block_by_number,
Block,
block_latest,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
# local imports
from eth_stat_syncer.store import (
GasAggregator,
RunStore,
)
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', './config')
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
argparser.add_argument('--start', type=int, help='number of blocks to sample at startup')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
if args.vv:
logging.getLogger().setLevel(logging.DEBUG)
elif args.v:
logging.getLogger().setLevel(logging.INFO)
config = confini.Config(args.c, args.env_prefix)
config.process()
# override args
args_override = {
'CHAIN_SPEC': getattr(args, 'i'),
'RPC_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.add(args.start, '_START', True)
logg.debug('loaded config: {}\n'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p)
class GasPriceFilter(SyncFilter):
def __init__(self, chain_spec, gas_aggregator):
self.chain_spec = chain_spec
self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session):
self.gas_aggregator.put(tx.gas_price)
class EthChainInterface(ChainInterface):
def __init__(self):
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_receipt = receipt
self._src_normalize = Tx.src_normalize
def main():
gas_store = RunStore(basedir=config.get('STORE_BASE_DIR'))
gas_aggregator = GasAggregator(gas_store, 360)
gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
o = block_latest()
r = conn.do(o)
n = int(r, 16)
start_block = n
logg.info('block height at start {}'.format(start_block))
chain_interface = EthChainInterface()
if config.get('_START') != None:
offset = start_block - config.get('_START')
syncer_backend = MemBackend.custom(chain_spec, start_block)
syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
try:
syncer.loop(0.0, conn)
except NoBlockForYou:
logg.info('history done at {}'.format(syncer.backend.get()))
syncer_backend = MemBackend(chain_spec, None)
syncer_backend.set(start_block + 1, 0)
syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
syncer.loop(1.0, conn)
if __name__ == '__main__':
main()