Add bound averages, local low and high

This commit is contained in:
nolash 2021-04-08 21:48:00 +02:00
parent 5b47ab5661
commit a72fcbe919
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 95 additions and 34 deletions

View File

@ -3,7 +3,6 @@ import sys
import os import os
import logging import logging
import argparse import argparse
import datetime
# external imports # external imports
import confini import confini
@ -18,6 +17,9 @@ from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import block_latest from chainlib.eth.block import block_latest
# local imports
from eth_stat_syncer.store import GasAggregator
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
@ -54,46 +56,20 @@ chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p) conn = EthHTTPConnection(args.p)
class GasStore:
def __init__(self, store):
self.store = store
self.avg = 0
self.count = 0
self.timestamp = datetime.datetime.utcnow()
def put(self, v):
aggr = self.avg * self.count
aggr += v
self.count += 1
self.avg = int(aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, aggr, self.avg, self.count))
return self.avg
def get(self):
return self.avg
class GasPriceFilter(SyncFilter): class GasPriceFilter(SyncFilter):
def __init__(self, chain_spec, gas_store): def __init__(self, chain_spec, gas_aggregator):
self.chain_spec = chain_spec self.chain_spec = chain_spec
self.gas_store = gas_store self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session): def filter(self, conn, block, tx, db_session):
self.gas_store.put(tx.gas_price) self.gas_aggregator.put(tx.gas_price)
def block_callback(block, tx):
logg.info('synced {}'.format(block))
def main(): def main():
gas_store = GasStore(None) gas_aggregator = GasAggregator(360)
gas_filter = GasPriceFilter(chain_spec, gas_store) gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
o = block_latest() o = block_latest()
r = conn.do(o) r = conn.do(o)
@ -105,7 +81,7 @@ def main():
offset = start_block - config.get('_START') offset = start_block - config.get('_START')
syncer_backend = MemBackend(chain_spec, None, target_block=start_block) syncer_backend = MemBackend(chain_spec, None, target_block=start_block)
syncer_backend.set(offset, 0) syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, block_callback=block_callback) syncer = HistorySyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter) syncer.add_filter(gas_filter)
try: try:
syncer.loop(0.0, conn) syncer.loop(0.0, conn)
@ -115,7 +91,7 @@ def main():
syncer_backend = MemBackend(chain_spec, None) syncer_backend = MemBackend(chain_spec, None)
syncer_backend.set(start_block + 1, 0) syncer_backend.set(start_block + 1, 0)
syncer = HeadSyncer(syncer_backend, block_callback=block_callback) syncer = HeadSyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter) syncer.add_filter(gas_filter)
syncer.loop(1.0, conn) syncer.loop(1.0, conn)

85
eth_stat_syncer/store.py Normal file
View File

@ -0,0 +1,85 @@
# standard imports
import logging
import datetime
logg = logging.getLogger().getChild(__name__)
class GasAggregator:
def __init__(self, capacity):
self.avg = 0
self.count = 0
self.timestamp = datetime.datetime.utcnow()
self.buffer_cursor = 0
self.buffer_capacity = capacity
self.buffer = [None] * self.buffer_capacity
self.initial = False
self.aggr = 0
self.local_aggr = 0
self.local_count = 0
self.local_high = 0
self.local_low = 0
def put(self, v):
self.local_aggr += v
self.local_count += 1
if self.local_low == 0:
self.local_low = v
if v > self.local_high:
self.local_high = v
elif v < self.local_low:
self.local_low = v
self.count += 1
def process(self):
if self.local_count == 0:
logg.info('skipping 0 tx block')
return False
v = int(self.local_aggr / self.local_count)
logg.info('calculated new block average {} from {} tx samples, low {} high {}'.format(v, self.local_count, self.local_low, self.local_high))
self.local_aggr = 0
self.local_count = 0
if not self.initial:
for i in range(self.buffer_capacity):
self.buffer[i] = v
self.initial = True
else:
self.buffer[self.buffer_cursor] = v
self.buffer_cursor += 1
self.buffer_cursor %= self.buffer_capacity
self.aggr = self.avg * self.count
self.aggr += v
self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True
def get(self, n=0):
if n == 0:
n = self.capacity
cursor = self.buffer_cursor
aggr = 0
i = 0
while i < n:
v = self.buffer[cursor]
aggr += v
cursor -= 1
if cursor < 0:
cursor = self.buffer_capacity - 1
i += 1
return int(aggr / n)
def block_callback(self, block, tx):
logg.info('synced {}'.format(block))
if self.process():
v = self.get(10)
logg.info('last 6 average now {}'.format(v))
v = self.get(360)
logg.info('last 360 average now {}'.format(v))