Compare commits

...

3 Commits

Author SHA1 Message Date
lash
c667cf351d
Add changelog 2022-04-07 13:55:59 +00:00
lash
016da073b7
Add moving average, remove faulty names for previous averages 2022-04-07 13:54:45 +00:00
lash
23a1d7ccc9
Add moving average stub to gas tracker 2022-04-07 13:29:44 +00:00
4 changed files with 75 additions and 41 deletions

4
CHANGELOG Normal file
View File

@ -0,0 +1,4 @@
* 0.1.0
- Custom scope calculations
- Averages, lows and highs
- Stats HTTP server

View File

@ -3,12 +3,12 @@ import sys
import os import os
import logging import logging
import argparse import argparse
import uuid
# external imports # external imports
import confini import confini
from chainsyncer.backend.memory import MemBackend from chainsyncer.store.fs import SyncFsStore
from chainsyncer.driver.head import HeadSyncer from chainsyncer.driver.chain_interface import ChainInterfaceDriver
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -33,14 +33,20 @@ from eth_stat_syncer.store import (
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', './config') script_dir = os.path.realpath(os.path.dirname(__file__))
exec_dir = os.path.realpath(os.getcwd())
default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config'))
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
argparser.add_argument('--start', type=int, help='number of blocks to sample at startup') argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store gas cache')
argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
@ -58,13 +64,19 @@ args_override = {
'RPC_PROVIDER': getattr(args, 'p'), 'RPC_PROVIDER': getattr(args, 'p'),
} }
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
config.add(args.start, '_START', True) config.add(args.offset, '_SYNC_OFFSET', True)
config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
config.add(args.cache_dir, '_CACHE_DIR', True)
config.add(args.session_id, '_SESSION_ID', True)
config.add(args.moving, '_MOVING', True)
logg.debug('loaded config: {}\n'.format(config)) logg.debug('loaded config: {}\n'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p) conn = EthHTTPConnection(args.p)
if config.get('_SESSION_ID') == None:
config.add(str(uuid.uuid4()), '_SESSION_ID', True)
class GasPriceFilter(SyncFilter): class GasPriceFilter(SyncFilter):
@ -73,8 +85,9 @@ class GasPriceFilter(SyncFilter):
self.gas_aggregator = gas_aggregator self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session): def filter(self, conn, block, tx, db_session=None):
self.gas_aggregator.put(tx.gas_price) self.gas_aggregator.put(tx.gas_price)
return False
class EthChainInterface(ChainInterface): class EthChainInterface(ChainInterface):
@ -87,33 +100,35 @@ class EthChainInterface(ChainInterface):
def main(): def main():
gas_store = RunStore(basedir=config.get('STORE_BASE_DIR')) gas_store = RunStore(basedir=config.get('_CACHE_DIR'))
gas_aggregator = GasAggregator(gas_store, 360) cap = 360
try:
v = max(config.get('_MOVING'))
if v > cap:
cap = v
except ValueError:
pass
gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING'))
gas_filter = GasPriceFilter(chain_spec, gas_aggregator) gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
o = block_latest() start_block = 0
r = conn.do(o) if config.get('_SYNC_OFFSET') != None:
n = int(r, 16) start_block = config.get('_SYNC_OFFSET')
start_block = n else:
logg.info('block height at start {}'.format(start_block)) o = block_latest()
r = conn.do(o)
n = int(r, 16)
start_block = n
logg.info('block height at start {}'.format(start_block))
chain_interface = EthChainInterface() chain_interface = EthChainInterface()
if config.get('_START') != None:
offset = start_block - config.get('_START')
syncer_backend = MemBackend.custom(chain_spec, start_block)
syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
try:
syncer.loop(0.0, conn)
except NoBlockForYou:
logg.info('history done at {}'.format(syncer.backend.get()))
syncer_backend = MemBackend(chain_spec, None) sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID'))
syncer_backend.set(start_block + 1, 0) sync_store.register(gas_filter)
syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter) drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback)
syncer.loop(1.0, conn)
r = drv.run(conn)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -40,7 +40,7 @@ class RunStore:
class GasAggregator: class GasAggregator:
def __init__(self, store, capacity): def __init__(self, store, capacity, moving=[]):
self.store = store self.store = store
self.avg = 0 self.avg = 0
self.count = 0 self.count = 0
@ -57,6 +57,15 @@ class GasAggregator:
self.local_high = 0 self.local_high = 0
self.local_low = 0 self.local_low = 0
self.moving = []
for v in moving:
if v > capacity:
raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
self.moving.append(v)
logg.info('will calculate moving average {}'.format(v))
logg.info('buffer capacity is {}'.format(capacity))
def put(self, v): def put(self, v):
self.local_aggr += v self.local_aggr += v
@ -95,25 +104,28 @@ class GasAggregator:
self.aggr += v self.aggr += v
self.avg = int(self.aggr / self.count) self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count)) logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True return True
def get(self, n=0): def get(self, n=0):
if n == 0: if n == 0:
n = self.capacity n = self.buffer_capacity
if n > self.count:
n = self.count
aggrs = { aggrs = {
'average': 0, 'average': 0,
'low': 0, 'low': 0,
'high': 0, 'high': 0,
} }
for o in [ for o in [
(self.buffer_average, 'average',), (self.buffer_average, 'average',),
(self.buffer_low, 'low',), (self.buffer_low, 'low',),
(self.buffer_high, 'high',), (self.buffer_high, 'high',),
]: ]:
cursor = self.buffer_cursor cursor = self.buffer_cursor
aggr = 0
i = 0 i = 0
while i < n: while i < n:
v = o[0][cursor] v = o[0][cursor]
@ -123,22 +135,25 @@ class GasAggregator:
cursor = self.buffer_capacity - 1 cursor = self.buffer_capacity - 1
i += 1 i += 1
return { r = {
'average': int(aggrs['average']/ n), 'average': int(aggrs['average']/ n),
'high': int(aggrs['high']/ n), 'high': int(aggrs['high']/ n),
'low': int(aggrs['low']/ n), 'low': int(aggrs['low']/ n),
} }
logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low']))
return r
def block_callback(self, block, tx): def block_callback(self, block, tx):
logg.info('synced {}'.format(block)) tx_count = len(block.txs)
logg.info('synced {} with {} txs'.format(block, tx_count))
if self.process(): if self.process():
last = self.get(1) last = self.get(tx_count)
self.store.put(last, 'block') self.store.put(last, 'block')
minute = self.get(6)
self.store.put(minute, 'minute')
hour = self.get(360)
self.store.put(hour, 'hour')
self.store.put({ self.store.put({
'average': int(self.aggr / self.count), 'average': int(self.aggr / self.count),
}, 'all') }, 'all')
for v in self.moving:
r = self.get(v)
self.store.put(r, str(v))

View File

@ -1,3 +1,3 @@
chainsyncer~=0.2.0 chainsyncer~=0.3.0
chainlib-eth>=0.1.0b1,<=0.1.0 chainlib-eth>=0.1.0b1,<=0.1.0
jsonrpc_std~=0.1.0 jsonrpc_std~=0.1.0