Compare commits

..

No commits in common. "c667cf351db90f7ea492a251de07ab6c3e7545cc" and "26b01099eb0be264bdf0fce69a2178d3ce394ac0" have entirely different histories.

4 changed files with 41 additions and 75 deletions

View File

@ -1,4 +0,0 @@
* 0.1.0
- Custom scope calculations
- Averages, lows and highs
- Stats HTTP server

View File

@ -3,12 +3,12 @@ import sys
import os import os
import logging import logging
import argparse import argparse
import uuid
# external imports # external imports
import confini import confini
from chainsyncer.store.fs import SyncFsStore from chainsyncer.backend.memory import MemBackend
from chainsyncer.driver.chain_interface import ChainInterfaceDriver from chainsyncer.driver.head import HeadSyncer
from chainsyncer.driver.history import HistorySyncer
from chainsyncer.filter import SyncFilter from chainsyncer.filter import SyncFilter
from chainsyncer.error import NoBlockForYou from chainsyncer.error import NoBlockForYou
from chainlib.chain import ChainSpec from chainlib.chain import ChainSpec
@ -33,20 +33,14 @@ from eth_stat_syncer.store import (
logging.basicConfig(level=logging.WARNING) logging.basicConfig(level=logging.WARNING)
logg = logging.getLogger() logg = logging.getLogger()
script_dir = os.path.realpath(os.path.dirname(__file__)) default_config_dir = os.environ.get('CONFINI_DIR', './config')
exec_dir = os.path.realpath(os.getcwd())
default_config_dir = os.environ.get('confini_dir', os.path.join(exec_dir, 'config'))
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider') argparser.add_argument('-p', '--provider', dest='p', type=str, help='rpc provider')
argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider') argparser.add_argument('-c', '--config', dest='c', default=default_config_dir, type=str, help='rpc provider')
argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec') argparser.add_argument('-i', '--chain-spec', dest='i', default='evm:ethereum:1', type=str, help='chain spec')
argparser.add_argument('--moving', action='append', default=[], type=int, help='add moving average') argparser.add_argument('--start', type=int, help='number of blocks to sample at startup')
argparser.add_argument('--offset', type=int, default=0, help='Start sync on this block')
argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration') argparser.add_argument('--env-prefix', default=os.environ.get('CONFINI_ENV_PREFIX'), dest='env_prefix', type=str, help='environment prefix for variables to overwrite configuration')
argparser.add_argument('--cache-dir', dest='cache_dir', type=str, help='Directory to store gas cache')
argparser.add_argument('--state-dir', dest='state_dir', default=exec_dir, type=str, help='Directory to store sync state')
argparser.add_argument('--session-id', dest='session_id', type=str, help='Use state from specified session id')
argparser.add_argument('-v', action='store_true', help='be verbose') argparser.add_argument('-v', action='store_true', help='be verbose')
argparser.add_argument('-vv', action='store_true', help='be more verbose') argparser.add_argument('-vv', action='store_true', help='be more verbose')
args = argparser.parse_args() args = argparser.parse_args()
@ -64,19 +58,13 @@ args_override = {
'RPC_PROVIDER': getattr(args, 'p'), 'RPC_PROVIDER': getattr(args, 'p'),
} }
config.dict_override(args_override, 'cli flag') config.dict_override(args_override, 'cli flag')
config.add(args.offset, '_SYNC_OFFSET', True) config.add(args.start, '_START', True)
config.add(os.path.realpath(args.state_dir), '_STATE_DIR', True)
config.add(args.cache_dir, '_CACHE_DIR', True)
config.add(args.session_id, '_SESSION_ID', True)
config.add(args.moving, '_MOVING', True)
logg.debug('loaded config: {}\n'.format(config)) logg.debug('loaded config: {}\n'.format(config))
chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC')) chain_spec = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
conn = EthHTTPConnection(args.p) conn = EthHTTPConnection(args.p)
if config.get('_SESSION_ID') == None:
config.add(str(uuid.uuid4()), '_SESSION_ID', True)
class GasPriceFilter(SyncFilter): class GasPriceFilter(SyncFilter):
@ -85,9 +73,8 @@ class GasPriceFilter(SyncFilter):
self.gas_aggregator = gas_aggregator self.gas_aggregator = gas_aggregator
def filter(self, conn, block, tx, db_session=None): def filter(self, conn, block, tx, db_session):
self.gas_aggregator.put(tx.gas_price) self.gas_aggregator.put(tx.gas_price)
return False
class EthChainInterface(ChainInterface): class EthChainInterface(ChainInterface):
@ -100,21 +87,10 @@ class EthChainInterface(ChainInterface):
def main(): def main():
gas_store = RunStore(basedir=config.get('_CACHE_DIR')) gas_store = RunStore(basedir=config.get('STORE_BASE_DIR'))
cap = 360 gas_aggregator = GasAggregator(gas_store, 360)
try:
v = max(config.get('_MOVING'))
if v > cap:
cap = v
except ValueError:
pass
gas_aggregator = GasAggregator(gas_store, cap, moving=config.get('_MOVING'))
gas_filter = GasPriceFilter(chain_spec, gas_aggregator) gas_filter = GasPriceFilter(chain_spec, gas_aggregator)
start_block = 0
if config.get('_SYNC_OFFSET') != None:
start_block = config.get('_SYNC_OFFSET')
else:
o = block_latest() o = block_latest()
r = conn.do(o) r = conn.do(o)
n = int(r, 16) n = int(r, 16)
@ -122,13 +98,22 @@ def main():
logg.info('block height at start {}'.format(start_block)) logg.info('block height at start {}'.format(start_block))
chain_interface = EthChainInterface() chain_interface = EthChainInterface()
if config.get('_START') != None:
offset = start_block - config.get('_START')
syncer_backend = MemBackend.custom(chain_spec, start_block)
syncer_backend.set(offset, 0)
syncer = HistorySyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
syncer.add_filter(gas_filter)
try:
syncer.loop(0.0, conn)
except NoBlockForYou:
logg.info('history done at {}'.format(syncer.backend.get()))
sync_store = SyncFsStore(config.get('_STATE_DIR'), session_id=config.get('_SESSION_ID')) syncer_backend = MemBackend(chain_spec, None)
sync_store.register(gas_filter) syncer_backend.set(start_block + 1, 0)
syncer = HeadSyncer(syncer_backend, chain_interface, block_callback=gas_aggregator.block_callback)
drv = ChainInterfaceDriver(sync_store, chain_interface, offset=start_block, target=-1, block_callback=gas_aggregator.block_callback) syncer.add_filter(gas_filter)
syncer.loop(1.0, conn)
r = drv.run(conn)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -40,7 +40,7 @@ class RunStore:
class GasAggregator: class GasAggregator:
def __init__(self, store, capacity, moving=[]): def __init__(self, store, capacity):
self.store = store self.store = store
self.avg = 0 self.avg = 0
self.count = 0 self.count = 0
@ -57,15 +57,6 @@ class GasAggregator:
self.local_high = 0 self.local_high = 0
self.local_low = 0 self.local_low = 0
self.moving = []
for v in moving:
if v > capacity:
raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
self.moving.append(v)
logg.info('will calculate moving average {}'.format(v))
logg.info('buffer capacity is {}'.format(capacity))
def put(self, v): def put(self, v):
self.local_aggr += v self.local_aggr += v
@ -104,28 +95,25 @@ class GasAggregator:
self.aggr += v self.aggr += v
self.avg = int(self.aggr / self.count) self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count)) logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True return True
def get(self, n=0): def get(self, n=0):
if n == 0: if n == 0:
n = self.buffer_capacity n = self.capacity
if n > self.count:
n = self.count
aggrs = { aggrs = {
'average': 0, 'average': 0,
'low': 0, 'low': 0,
'high': 0, 'high': 0,
} }
for o in [ for o in [
(self.buffer_average, 'average',), (self.buffer_average, 'average',),
(self.buffer_low, 'low',), (self.buffer_low, 'low',),
(self.buffer_high, 'high',), (self.buffer_high, 'high',),
]: ]:
cursor = self.buffer_cursor cursor = self.buffer_cursor
aggr = 0
i = 0 i = 0
while i < n: while i < n:
v = o[0][cursor] v = o[0][cursor]
@ -135,25 +123,22 @@ class GasAggregator:
cursor = self.buffer_capacity - 1 cursor = self.buffer_capacity - 1
i += 1 i += 1
r = { return {
'average': int(aggrs['average']/ n), 'average': int(aggrs['average']/ n),
'high': int(aggrs['high']/ n), 'high': int(aggrs['high']/ n),
'low': int(aggrs['low']/ n), 'low': int(aggrs['low']/ n),
} }
logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low']))
return r
def block_callback(self, block, tx): def block_callback(self, block, tx):
tx_count = len(block.txs) logg.info('synced {}'.format(block))
logg.info('synced {} with {} txs'.format(block, tx_count))
if self.process(): if self.process():
last = self.get(tx_count) last = self.get(1)
self.store.put(last, 'block') self.store.put(last, 'block')
minute = self.get(6)
self.store.put(minute, 'minute')
hour = self.get(360)
self.store.put(hour, 'hour')
self.store.put({ self.store.put({
'average': int(self.aggr / self.count), 'average': int(self.aggr / self.count),
}, 'all') }, 'all')
for v in self.moving:
r = self.get(v)
self.store.put(r, str(v))

View File

@ -1,3 +1,3 @@
chainsyncer~=0.3.0 chainsyncer~=0.2.0
chainlib-eth>=0.1.0b1,<=0.1.0 chainlib-eth>=0.1.0b1,<=0.1.0
jsonrpc_std~=0.1.0 jsonrpc_std~=0.1.0