Compare commits

...

7 Commits

Author SHA1 Message Date
lash c43cb44a60
Remove bogus aggregate alias 2022-04-07 14:45:47 +00:00
lash c667cf351d
Add changelog 2022-04-07 13:55:59 +00:00
lash 016da073b7
Add moving average, remove faulty names for previous averages 2022-04-07 13:54:45 +00:00
lash 23a1d7ccc9
Add moving average stub to gas tracker 2022-04-07 13:29:44 +00:00
lash 26b01099eb
Improve accept handling in server 2022-03-08 10:17:01 +00:00
lash 212b06ea5c
Bump version 2022-03-08 09:56:44 +00:00
lash 748542cb0b
Rehabilitate to chaintool release candidates 2022-03-08 09:56:15 +00:00
6 changed files with 117 additions and 50 deletions

4
CHANGELOG Normal file
View File

@ -0,0 +1,4 @@
* 0.1.0
- Custom scope calculations
- Averages, lows and highs
- Stats HTTP server

View File

@ -13,6 +13,7 @@ from http.server import (
import confini
from jsonrpc_std.parse import jsonrpc_from_str
from jsonrpc_std.interface import jsonrpc_response
from jsonrpc_std.error import JSONRPCException
# local imports
from eth_stat_syncer.store import RunStore
@ -25,7 +26,7 @@ default_config_dir = os.environ.get('CONFINI_DIR', './config')
argparser = argparse.ArgumentParser()
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('--host', type=str, help='httpd host')
argparser.add_argument('--port', type=str, help='httpd port')
argparser.add_argument('--port', type=int, help='httpd port')
argparser.add_argument('--store-dir', dest='store_dir', type=str, help='syncerd data store base directory')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('-v', action='store_true', help='be verbose')
@ -61,7 +62,22 @@ class StatRequestHandler(BaseHTTPRequestHandler):
self.end_headers()
return
if 'application/json' not in self.headers.get('Accept').split(','):
accept = ['application/json']
try:
accept = self.headers.get('Accept').split(',')
except AttributeError:
pass
logg.debug('accept is {}'.format(accept))
nocomprendo = True
if '*/*' in accept:
nocomprendo = False
elif 'application/json' in accept:
nocomprendo = False
elif '*' in accept:
nocomprendo = False
if nocomprendo:
self.send_response(400, 'me json only speak')
self.end_headers()
return
@ -107,7 +123,7 @@ class StatRequestHandler(BaseHTTPRequestHandler):
self.send_response(200, 'alas with jsonrpc error')
else:
r = self.runstore.get('high', 'minute')
r = self.runstore.get('high', 'block')
r = int(r)
if r == 0:
r = 1

View File

@ -3,19 +3,26 @@ import sys
import os
import logging
import argparse
import uuid
# external imports
import confini
from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver import (
HeadSyncer,
HistorySyncer,
)
from chainsyncer.store.fs import SyncFsStore
from chainsyncer.driver.chain_interface import ChainInterfaceDriver
from chainsyncer.filter import SyncFilter
from chainsyncer.error import NoBlockForYou
from chainlib.chain import ChainSpec
from chainlib.eth.connection import EthHTTPConnection
from chainlib.eth.block import block_latest
from chainlib.interface import ChainInterface
from chainlib.eth.block import (
block_by_number,
Block,
block_latest,
)
from chainlib.eth.tx import (
receipt,
Tx,
)
# local imports
from eth_stat_syncer.store import (
@ -26,14 +33,20 @@ from eth_stat_syncer.store import (
logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger()
default_config_dir = os.environ.get('CONFINI_DIR', './config')
script_dir = os.path.realpath(os.path.dirname(__file__))
exec_dir = os.path.realpath(os.getcwd())
default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config'))
argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
argparser.add_argument('--start', type=int, help='number of blocks to sample at startup')
argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store gas cache')
argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args()
@ -51,13 +64,19 @@ args_override = {
'RPC_PROVIDER': getattr(args, 'p'),
}
config.dict_override(args_override, 'cli flag')
config.add(args.start, '_START', True)
config.add(args.offset, '_SYNC_OFFSET', True)
config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
config.add(args.cache_dir, '_CACHE_DIR', True)
config.add(args.session_id, '_SESSION_ID', True)
config.add(args.moving, '_MOVING', True)
logg.debug('loaded config: {}\n'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p)
if config.get('_SESSION_ID') == None:
config.add(str(uuid.uuid4()), '_SESSION_ID', True)
class GasPriceFilter(SyncFilter):
@ -66,37 +85,50 @@ class GasPriceFilter(SyncFilter):
self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session):
def filter(self, conn, block, tx, db_session=None):
self.gas_aggregator.put(tx.gas_price)
return False
class EthChainInterface(ChainInterface):
def __init__(self):
self._block_by_number = block_by_number
self._block_from_src = Block.from_src
self._tx_receipt = receipt
self._src_normalize = Tx.src_normalize
def main():
gas_store = RunStore(basedir=config.get('STORE_BASE_DIR'))
gas_aggregator = GasAggregator(gas_store, 360)
gas_store = RunStore(basedir=config.get('_CACHE_DIR'))
cap = 360
try:
v = max(config.get('_MOVING'))
if v > cap:
cap = v
except ValueError:
pass
gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING'))
gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
o = block_latest()
r = conn.do(o)
n = int(r, 16)
start_block = n
logg.info('block height at start {}'.format(start_block))
start_block = 0
if config.get('_SYNC_OFFSET') != None:
start_block = config.get('_SYNC_OFFSET')
else:
o = block_latest()
r = conn.do(o)
n = int(r, 16)
start_block = n
logg.info('block height at start {}'.format(start_block))
if config.get('_START') != None:
offset = start_block - config.get('_START')
syncer_backend = MemBackend(chain_spec, None, target_block=start_block)
syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
try:
syncer.loop(0.0, conn)
except NoBlockForYou:
logg.info('history done at {}'.format(syncer.backend.get()))
chain_interface = EthChainInterface()
syncer_backend = MemBackend(chain_spec, None)
syncer_backend.set(start_block + 1, 0)
syncer = HeadSyncer(syncer_backend, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
syncer.loop(1.0, conn)
sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID'))
sync_store.register(gas_filter)
drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback)
r = drv.run(conn)
if __name__ == '__main__':

View File

@ -40,7 +40,7 @@ class RunStore:
class GasAggregator:
def __init__(self, store, capacity):
def __init__(self, store, capacity, moving=[]):
self.store = store
self.avg = 0
self.count = 0
@ -57,6 +57,15 @@ class GasAggregator:
self.local_high = 0
self.local_low = 0
self.moving = []
for v in moving:
if v > capacity:
raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
self.moving.append(v)
logg.info('will calculate moving average {}'.format(v))
logg.info('buffer capacity is {}'.format(capacity))
def put(self, v):
self.local_aggr += v
@ -95,25 +104,28 @@ class GasAggregator:
self.aggr += v
self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True
def get(self, n=0):
if n == 0:
n = self.capacity
n = self.buffer_capacity
if n > self.count:
n = self.count
aggrs = {
'average': 0,
'low': 0,
'high': 0,
}
for o in [
(self.buffer_average, 'average',),
(self.buffer_low, 'low',),
(self.buffer_high, 'high',),
]:
cursor = self.buffer_cursor
aggr = 0
i = 0
while i < n:
v = o[0][cursor]
@ -122,23 +134,26 @@ class GasAggregator:
if cursor < 0:
cursor = self.buffer_capacity - 1
i += 1
return {
r = {
'average': int(aggrs['average']/ n),
'high': int(aggrs['high']/ n),
'low': int(aggrs['low']/ n),
}
logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low']))
return r
def block_callback(self, block, tx):
logg.info('synced {}'.format(block))
tx_count = len(block.txs)
logg.info('synced {} with {} txs'.format(block, tx_count))
if self.process():
last = self.get(1)
last = self.get(tx_count)
self.store.put(last, 'block')
minute = self.get(6)
self.store.put(minute, 'minute')
hour = self.get(360)
self.store.put(hour, 'hour')
self.store.put({
'average': int(self.aggr / self.count),
}, 'all')
for v in self.moving:
r = self.get(v)
self.store.put(r, str(v))

View File

@ -1,3 +1,3 @@
chainsyncer~=0.0.2a3
chainlib~=0.0.3a1
jsonrpc_std~=0.0.1a2
chainsyncer~=0.3.0
chainlib-eth>=0.1.0b1,<=0.1.0
jsonrpc_std~=0.1.0

View File

@ -1,6 +1,6 @@
[metadata]
name = eth-stat-syncer
version = 0.0.1a3
version = 0.1.0
description = Cache live EVM blockchain stats
author = Louis Holbrook
author_email = dev@holbrook.no