From 23a1d7ccc9fd28428289424c77d32767db404e60 Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 7 Apr 2022 13:29:44 +0000 Subject: [PATCH] Add moving average stub to gas tracker --- eth_stat_syncer/runnable/tracker.py | 88 ++++++++++++++++++++--------- eth_stat_syncer/store.py | 11 +++- requirements.txt | 2 +- 3 files changed, 71 insertions(+), 30 deletions(-) diff --git a/eth_stat_syncer/runnable/tracker.py b/eth_stat_syncer/runnable/tracker.py index 09764c9..af9a2eb 100644 --- a/eth_stat_syncer/runnable/tracker.py +++ b/eth_stat_syncer/runnable/tracker.py @@ -3,12 +3,12 @@ import sys import os import logging import argparse +import uuid # external imports import confini -from chainsyncer.backend.memory import MemBackend -from chainsyncer.driver.head import HeadSyncer -from chainsyncer.driver.history import HistorySyncer +from chainsyncer.store.fs import SyncFsStore +from chainsyncer.driver.chain_interface import ChainInterfaceDriver from chainsyncer.filter import SyncFilter from chainsyncer.error import NoBlockForYou from chainlib.chain import ChainSpec @@ -33,14 +33,20 @@ from eth_stat_syncer.store import ( logging.basicConfig(level=logging.WARNING) logg = logging.getLogger() -default_config_dir = os.environ.get('CONFINI_DIR', './config') +script_dir = os.path.realpath(os.path.dirname(__file__)) +exec_dir = os.path.realpath(os.getcwd()) +default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config')) argparser = argparse.ArgumentParser() argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') -argparser.add_argument('--start', type=int, help='number of blocks to sample at startup') +argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average') +argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') +argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store tx data') +argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state') +argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id') argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose') args = argparser.parse_args() @@ -58,13 +64,19 @@ args_override = { 'RPC_PROVIDER': getattr(args, 'p'), } config.dict_override(args_override, 'cli flag') -config.add(args.start, '_START', True) +config.add(args.offset, '_SYNC_OFFSET', True) +config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True) +config.add(args.cache_dir, '_CACHE_DIR', True) +config.add(args.session_id, '_SESSION_ID', True) +config.add(args.moving, '_MOVING', True) logg.debug('loaded config: {}\n'.format(config)) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) conn = EthHTTPConnection(args.p) +if config.get('_SESSION_ID') == None: + config.add(str(uuid.uuid4()), '_SESSION_ID', True) class GasPriceFilter(SyncFilter): @@ -73,8 +85,9 @@ class GasPriceFilter(SyncFilter): self.gas_aggregator = gas_aggregator - def filter(self, conn, block, tx, db_session): + def filter(self, conn, block, tx, db_session=None): self.gas_aggregator.put(tx.gas_price) + return False class EthChainInterface(ChainInterface): @@ -88,32 +101,51 @@ class EthChainInterface(ChainInterface): def main(): gas_store = RunStore(basedir=config.get('STORE_BASE_DIR')) - gas_aggregator = GasAggregator(gas_store, 360) + cap = 360 + try: + v = max(config.get('_MOVING')) + if v > cap: + cap = v + except ValueError: + pass + gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING')) gas_filter = GasPriceFilter(chain_spec, gas_aggregator) - o = block_latest() - r = conn.do(o) - n = int(r, 16) - start_block = n - logg.info('block height at start {}'.format(start_block)) + start_block = 0 + if config.get('_SYNC_OFFSET') != None: + start_block = config.get('_SYNC_OFFSET') + else: + o = block_latest() + r = conn.do(o) + n = int(r, 16) + start_block = n + logg.info('block height at start {}'.format(start_block)) chain_interface = EthChainInterface() - if config.get('_START') != None: - offset = start_block - config.get('_START') - syncer_backend = MemBackend.custom(chain_spec, start_block) - syncer_backend.set(offset, 0) - syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback) - syncer.add_filter(gas_filter) - try: - syncer.loop(0.0, conn) - except NoBlockForYou: - logg.info('history done at {}'.format(syncer.backend.get())) - syncer_backend = MemBackend(chain_spec, None) - syncer_backend.set(start_block + 1, 0) - syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback) - syncer.add_filter(gas_filter) - syncer.loop(1.0, conn) + sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID')) + sync_store.register(gas_filter) + + drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback) + + r = drv.run(conn) + +# if config.get('_START') != None: +# offset = start_block - config.get('_START') +# syncer_backend = MemBackend.custom(chain_spec, start_block) +# syncer_backend.set(offset, 0) +# syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback) +# syncer.add_filter(gas_filter) +# try: +# syncer.loop(0.0, conn) +# except NoBlockForYou: +# logg.info('history done at {}'.format(syncer.backend.get())) + +# syncer_backend = MemBackend(chain_spec, None) +# syncer_backend.set(start_block + 1, 0) +# syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback) +# syncer.add_filter(gas_filter) +# syncer.loop(1.0, conn) if __name__ == '__main__': diff --git a/eth_stat_syncer/store.py b/eth_stat_syncer/store.py index 2aa3a16..ef81df5 100644 --- a/eth_stat_syncer/store.py +++ b/eth_stat_syncer/store.py @@ -40,7 +40,7 @@ class RunStore: class GasAggregator: - def __init__(self, store, capacity): + def __init__(self, store, capacity, moving=[]): self.store = store self.avg = 0 self.count = 0 @@ -57,6 +57,15 @@ class GasAggregator: self.local_high = 0 self.local_low = 0 + self.moving = [] + for v in moving: + if v > capacity: + raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity)) + self.moving.append(v) + logg.info('will calculate moving average {}'.format(v)) + + logg.info('buffer capacity is {}'.format(capacity)) + def put(self, v): self.local_aggr += v diff --git a/requirements.txt b/requirements.txt index 9a96cc9..015c127 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -chainsyncer~=0.2.0 +chainsyncer~=0.3.0 chainlib-eth>=0.1.0b1,<=0.1.0 jsonrpc_std~=0.1.0