eth-stat-syncer/eth_stat_syncer/store.py

160 lines
4.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import os
import logging
import datetime
logg = logging.getLogger().getChild(__name__)
class RunStore:
def __init__(self, basedir='/var/lib'):
self.procstore = os.path.join(basedir, 'eth_stat_syncerd')
def get_postfix_path(self, name, postfix):
file_path = os.path.join(self.procstore, postfix, name)
return file_path
def put(self, o, postfix):
for k in o:
file_path = self.get_postfix_path(k, postfix)
d = os.path.dirname(file_path)
try:
os.stat(d)
except FileNotFoundError:
os.makedirs(d)
f = open(file_path, 'w')
f.write(str(o[k]))
f.close()
def get(self, name, postfix):
file_path = self.get_postfix_path(name, postfix)
f = open(file_path, 'r')
r = f.read()
f.close()
return r
class GasAggregator:
def __init__(self, store, capacity, moving=[]):
self.store = store
self.avg = 0
self.count = 0
self.timestamp = datetime.datetime.utcnow()
self.buffer_cursor = 0
self.buffer_capacity = capacity
self.buffer_average = [None] * self.buffer_capacity
self.buffer_high = [None] * self.buffer_capacity
self.buffer_low = [None] * self.buffer_capacity
self.initial = False
self.aggr = 0
self.local_aggr = 0
self.local_count = 0
self.local_high = 0
self.local_low = 0
self.moving = []
for v in moving:
if v > capacity:
raise ValueError('moving average {} requested but capacity is only {}'.format(v, capacity))
self.moving.append(v)
logg.info('will calculate moving average {}'.format(v))
logg.info('buffer capacity is {}'.format(capacity))
def put(self, v):
self.local_aggr += v
self.local_count += 1
if self.local_low == 0:
self.local_low = v
if v > self.local_high:
self.local_high = v
elif v < self.local_low:
self.local_low = v
self.count += 1
def process(self):
if self.local_count == 0:
logg.info('skipping 0 tx block')
return False
v = int(self.local_aggr / self.local_count)
logg.info('calculated new block average {} from {} tx samples, low {} high {}'.format(v, self.local_count, self.local_low, self.local_high))
self.local_aggr = 0
self.local_count = 0
if not self.initial:
for i in range(self.buffer_capacity):
self.buffer_average[i] = v
self.buffer_high[i] = self.local_high
self.buffer_low[i] = self.local_low
self.initial = True
else:
self.buffer_average[self.buffer_cursor] = v
self.buffer_low[self.buffer_cursor] = self.local_low
self.buffer_high[self.buffer_cursor] = self.local_high
self.buffer_cursor += 1
self.buffer_cursor %= self.buffer_capacity
self.aggr = self.avg * self.count
self.aggr += v
self.avg = int(self.aggr / self.count)
logg.info('added {} to aggregate {} new average {} from {} samples'.format(v, self.aggr, self.avg, self.count))
return True
def get(self, n=0):
if n == 0:
n = self.buffer_capacity
if n > self.count:
n = self.count
aggrs = {
'average': 0,
'low': 0,
'high': 0,
}
for o in [
(self.buffer_average, 'average',),
(self.buffer_low, 'low',),
(self.buffer_high, 'high',),
]:
cursor = self.buffer_cursor
i = 0
while i < n:
v = o[0][cursor]
aggrs[o[1]] += v
cursor -= 1
if cursor < 0:
cursor = self.buffer_capacity - 1
i += 1
r = {
'average': int(aggrs['average']/ n),
'high': int(aggrs['high']/ n),
'low': int(aggrs['low']/ n),
}
logg.debug('calc for {}: avg {} high {} low {}'.format(n, r['average'], r['high'], r['low']))
return r
def block_callback(self, block, tx):
tx_count = len(block.txs)
logg.info('synced {} with {} txs'.format(block, tx_count))
if self.process():
last = self.get(tx_count)
self.store.put(last, 'block')
self.store.put({
'average': int(self.aggr / self.count),
}, 'all')
for v in self.moving:
r = self.get(v)
self.store.put(r, str(v))