Compare commits

...

7 Commits

Author SHA1 Message Date
lash c43cb44a60
Remove bogus aggregate alias 2022-04-07 14:45:47 +00:00
lash c667cf351d
Add changelog 2022-04-07 13:55:59 +00:00
lash 016da073b7
Add moving average, remove faulty names for previous averages 2022-04-07 13:54:45 +00:00
lash 23a1d7ccc9
Add moving average stub to gas tracker 2022-04-07 13:29:44 +00:00
lash 26b01099eb
Improve accept handling in server 2022-03-08 10:17:01 +00:00
lash 212b06ea5c
Bump version 2022-03-08 09:56:44 +00:00
lash 748542cb0b
Rehabilitate to chaintool release candidates 2022-03-08 09:56:15 +00:00
6 changed files with 117 additions and 50 deletions

4
CHANGELOG Normal file
View File

@ -0,0 +1,4 @@
* 0.1.0
- Custom scope calculations
- Averages, lows and highs
- Stats HTTP server

View File

@ -13,6 +13,7 @@ from http.server import (
import confini import confini
from jsonrpc_std.parse import jsonrpc_from_str from jsonrpc_std.parse import jsonrpc_from_str
from jsonrpc_std.interface import jsonrpc_response from jsonrpc_std.interface import jsonrpc_response
from jsonrpc_std.error import JSONRPCException
# local imports # local imports
from eth_stat_syncer.store import RunStore from eth_stat_syncer.store import RunStore
@ -25,7 +26,7 @@ default_config_dir = os.environ.get('CONFINI_DIR', './config')
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('--host', type=str, help='httpd host') argparser.add_argument('--host', type=str, help='httpd host')
argparser.add_argument('--port', type=str, help='httpd port') argparser.add_argument('--port', type=int, help='httpd port')
argparser.add_argument('--store-dir', dest='store_dir', type=str, help='syncerd data store base directory') argparser.add_argument('--store-dir', dest='store_dir', type=str, help='syncerd data store base directory')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
@ -61,7 +62,22 @@ class StatRequestHandler(BaseHTTPRequestHandler):
self.end_headers() self.end_headers()
return return
if 'application/json' not in self.headers.get('Accept').split(','): accept = ['application/json']
try:
accept = self.headers.get('Accept').split(',')
except AttributeError:
pass
logg.debug('accept is {}'.format(accept))
nocomprendo = True
if '*/*' in accept:
nocomprendo = False
elif 'application/json' in accept:
nocomprendo = False
elif '*' in accept:
nocomprendo = False
if nocomprendo:
self.send_response(400, 'me json only speak') self.send_response(400, 'me json only speak')
self.end_headers() self.end_headers()
return return
@ -107,7 +123,7 @@ class StatRequestHandler(BaseHTTPRequestHandler):
self.send_response(200, 'alas with jsonrpc error') self.send_response(200, 'alas with jsonrpc error')
else: else:
r = self.runstore.get('high', 'minute') r = self.runstore.get('high', 'block')
r = int(r) r = int(r)
if r == 0: if r == 0:
r = 1 r = 1

View File

@ -3,19 +3,26 @@ import sys
import os import os
import logging import logging
import argparse import argparse
import uuid
# external imports # external imports
import confini import confini
from chainsyncer.backend.memory import MemBackend from chainsyncer.store.fs import SyncFsStore
from chainsyncer.driver import ( from chainsyncer.driver.chain_interface import ChainInterfaceDriver
HeadSyncer,
HistorySyncer,
)
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import block_latest from chainlib.interface import ChainInterface
from chainlib.eth.block import (
block_by_number,
Block,
block_latest,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
# local imports # local imports
from eth_stat_syncer.store import ( from eth_stat_syncer.store import (
@ -26,14 +33,20 @@ from eth_stat_syncer.store import (
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', './config') script_dir = os.path.realpath(os.path.dirname(__file__))
exec_dir = os.path.realpath(os.getcwd())
default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config'))
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
argparser.add_argument('--start', type=int, help='number of blocks to sample at startup') argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store gas cache')
argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
@ -51,13 +64,19 @@ args_override = {
'RPC_PROVIDER': getattr(args, 'p'), 'RPC_PROVIDER': getattr(args, 'p'),
} }
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
config.add(args.start, '_START', True) config.add(args.offset, '_SYNC_OFFSET', True)
config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
config.add(args.cache_dir, '_CACHE_DIR', True)
config.add(args.session_id, '_SESSION_ID', True)
config.add(args.moving, '_MOVING', True)
logg.debug('loaded config: {}\n'.format(config)) logg.debug('loaded config: {}\n'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p) conn = EthHTTPConnection(args.p)
if config.get('_SESSION_ID') == None:
config.add(str(uuid.uuid4()), '_SESSION_ID', True)
class GasPriceFilter(SyncFilter): class GasPriceFilter(SyncFilter):
@ -66,37 +85,50 @@ class GasPriceFilter(SyncFilter):
self.gas_aggregator = gas_aggregator self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session): def filter(self, conn, block, tx, db_session=None):
self.gas_aggregator.put(tx.gas_price) self.gas_aggregator.put(tx.gas_price)
return False
class EthChainInterface(ChainInterface):
def __init__(self):
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_receipt = receipt
self._src_normalize = Tx.src_normalize
def main(): def main():
gas_store = RunStore(basedir=config.get('STORE_BASE_DIR')) gas_store = RunStore(basedir=config.get('_CACHE_DIR'))
gas_aggregator = GasAggregator(gas_store, 360) cap = 360
try:
v = max(config.get('_MOVING'))
if v > cap:
cap = v
except ValueError:
pass
gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING'))
gas_filter = GasPriceFilter(chain_spec, gas_aggregator) gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
o = block_latest() start_block = 0
r = conn.do(o) if config.get('_SYNC_OFFSET') != None:
n = int(r, 16) start_block = config.get('_SYNC_OFFSET')
start_block = n else:
logg.info('block height at start {}'.format(start_block)) o = block_latest()
r = conn.do(o)
n = int(r, 16)
start_block = n
logg.info('block height at start {}'.format(start_block))
if config.get('_START') != None: chain_interface = EthChainInterface()
offset = start_block - config.get('_START')
syncer_backend = MemBackend(chain_spec, None, target_block=start_block)
syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
try:
syncer.loop(0.0, conn)
except NoBlockForYou:
logg.info('history done at {}'.format(syncer.backend.get()))
syncer_backend = MemBackend(chain_spec, None) sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID'))
syncer_backend.set(start_block + 1, 0) sync_store.register(gas_filter)
syncer = HeadSyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter) drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback)
syncer.loop(1.0, conn)
r = drv.run(conn)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -40,7 +40,7 @@ class RunStore:
class GasAggregator: class GasAggregator:
def __init__(self, store, capacity): def __init__(self, store, capacity, moving=[]):
self.store = store self.store = store
self.avg = 0 self.avg = 0
self.count = 0 self.count = 0
@ -57,6 +57,15 @@ class GasAggregator:
self.local_high = 0 self.local_high = 0
self.local_low = 0 self.local_low = 0
self.moving = []
for v in moving:
if v > capacity:
raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
self.moving.append(v)
logg.info('will calculate moving average {}'.format(v))
logg.info('buffer capacity is {}'.format(capacity))
def put(self, v): def put(self, v):
self.local_aggr += v self.local_aggr += v
@ -95,25 +104,28 @@ class GasAggregator:
self.aggr += v self.aggr += v
self.avg = int(self.aggr / self.count) self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count)) logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True return True
def get(self, n=0): def get(self, n=0):
if n == 0: if n == 0:
n = self.capacity n = self.buffer_capacity
if n > self.count:
n = self.count
aggrs = { aggrs = {
'average': 0, 'average': 0,
'low': 0, 'low': 0,
'high': 0, 'high': 0,
} }
for o in [ for o in [
(self.buffer_average, 'average',), (self.buffer_average, 'average',),
(self.buffer_low, 'low',), (self.buffer_low, 'low',),
(self.buffer_high, 'high',), (self.buffer_high, 'high',),
]: ]:
cursor = self.buffer_cursor cursor = self.buffer_cursor
aggr = 0
i = 0 i = 0
while i < n: while i < n:
v = o[0][cursor] v = o[0][cursor]
@ -122,23 +134,26 @@ class GasAggregator:
if cursor < 0: if cursor < 0:
cursor = self.buffer_capacity - 1 cursor = self.buffer_capacity - 1
i += 1 i += 1
return { r = {
'average': int(aggrs['average']/ n), 'average': int(aggrs['average']/ n),
'high': int(aggrs['high']/ n), 'high': int(aggrs['high']/ n),
'low': int(aggrs['low']/ n), 'low': int(aggrs['low']/ n),
} }
logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low']))
return r
def block_callback(self, block, tx): def block_callback(self, block, tx):
logg.info('synced {}'.format(block)) tx_count = len(block.txs)
logg.info('synced {} with {} txs'.format(block, tx_count))
if self.process(): if self.process():
last = self.get(1) last = self.get(tx_count)
self.store.put(last, 'block') self.store.put(last, 'block')
minute = self.get(6)
self.store.put(minute, 'minute')
hour = self.get(360)
self.store.put(hour, 'hour')
self.store.put({ self.store.put({
'average': int(self.aggr / self.count), 'average': int(self.aggr / self.count),
}, 'all') }, 'all')
for v in self.moving:
r = self.get(v)
self.store.put(r, str(v))

View File

@ -1,3 +1,3 @@
chainsyncer~=0.0.2a3 chainsyncer~=0.3.0
chainlib~=0.0.3a1 chainlib-eth>=0.1.0b1,<=0.1.0
jsonrpc_std~=0.0.1a2 jsonrpc_std~=0.1.0

View File

@ -1,6 +1,6 @@
[metadata] [metadata]
name = eth-stat-syncer name = eth-stat-syncer
version = 0.0.1a3 version = 0.1.0
description = Cache live EVM blockchain stats description = Cache live EVM blockchain stats
author = Louis Holbrook author = Louis Holbrook
author_email = dev@holbrook.no author_email = dev@holbrook.no