Stateless settings update
This commit is contained in:
parent
fd90045762
commit
c4ac452e0f
@ -8,6 +8,7 @@ class SyncTimeRedisCache:
|
|||||||
self.state = RedisMonitor('cic-eth-tracker', host=host, port=port, db=db)
|
self.state = RedisMonitor('cic-eth-tracker', host=host, port=port, db=db)
|
||||||
self.state.register('lastseen', persist=True)
|
self.state.register('lastseen', persist=True)
|
||||||
|
|
||||||
|
|
||||||
def cache_time(self, block, tx):
|
def cache_time(self, block, tx):
|
||||||
v = self.state.get('lastseen')
|
v = self.state.get('lastseen')
|
||||||
if v != None:
|
if v != None:
|
||||||
@ -15,5 +16,3 @@ class SyncTimeRedisCache:
|
|||||||
if v > block.timestamp:
|
if v > block.timestamp:
|
||||||
return
|
return
|
||||||
self.state.set('lastseen', block.timestamp)
|
self.state.set('lastseen', block.timestamp)
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,5 +23,3 @@ def state_change_callback(k, old_state, new_state):
|
|||||||
|
|
||||||
def filter_change_callback(k, old_state, new_state):
|
def filter_change_callback(k, old_state, new_state):
|
||||||
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
|
logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state))
|
||||||
|
|
||||||
|
|
||||||
|
@ -14,7 +14,10 @@ import cic_base.cli
|
|||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
#from cic_tracker.cache import SyncTimeRedisCache
|
#from cic_tracker.cache import SyncTimeRedisCache
|
||||||
from cic_tracker.settings import CICTrackerSettings
|
from cic_tracker.settings import (
|
||||||
|
CICTrackerSettings,
|
||||||
|
process_settings,
|
||||||
|
)
|
||||||
from cic_tracker.callback import (
|
from cic_tracker.callback import (
|
||||||
pre_callback,
|
pre_callback,
|
||||||
post_callback,
|
post_callback,
|
||||||
@ -53,7 +56,7 @@ config.add(args.until, '_UNTIL', True)
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
settings = CICTrackerSettings()
|
settings = CICTrackerSettings()
|
||||||
settings.process(config)
|
process_settings(settings, config)
|
||||||
logg.debug('settings:\n' + str(settings))
|
logg.debug('settings:\n' + str(settings))
|
||||||
|
|
||||||
drv = ChainInterfaceDriver(
|
drv = ChainInterfaceDriver(
|
||||||
|
@ -9,7 +9,14 @@ from hexathon import (
|
|||||||
to_int as hex_to_int,
|
to_int as hex_to_int,
|
||||||
strip_0x,
|
strip_0x,
|
||||||
)
|
)
|
||||||
from cic_base import CICSettings
|
#from cic_base import CICSettings
|
||||||
|
from cic_base.settings import (
|
||||||
|
process_common,
|
||||||
|
process_database,
|
||||||
|
process_trusted_addresses,
|
||||||
|
process_registry,
|
||||||
|
process_celery,
|
||||||
|
)
|
||||||
from cic_sync_filter.callback import CallbackFilter
|
from cic_sync_filter.callback import CallbackFilter
|
||||||
|
|
||||||
# local imports
|
# local imports
|
||||||
@ -22,101 +29,15 @@ from cic_tracker.callback import (
|
|||||||
logg = logging.getLogger(__name__)
|
logg = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class CICTrackerSettings(CICSettings):
|
class CICTrackerSettings:
|
||||||
|
|
||||||
def process_sync_interface(self, config):
|
def __init__(self):
|
||||||
self.o['SYNCER_INTERFACE'] = EthChainInterface()
|
self.o = {}
|
||||||
|
self.get = self.o.get
|
||||||
|
|
||||||
def process_sync_range(self, config):
|
|
||||||
o = block_latest()
|
|
||||||
r = self.o['RPC'].do(o)
|
|
||||||
block_offset = int(strip_0x(r), 16) + 1
|
|
||||||
logg.info('network block height at startup is {}'.format(block_offset))
|
|
||||||
|
|
||||||
keep_alive = False
|
|
||||||
session_block_offset = 0
|
|
||||||
block_limit = 0
|
|
||||||
session_block_offset = int(config.get('SYNCER_OFFSET'))
|
|
||||||
|
|
||||||
until = int(config.get('_UNTIL'))
|
|
||||||
if until > 0:
|
|
||||||
if until <= session_block_offset:
|
|
||||||
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
|
|
||||||
block_limit = until
|
|
||||||
else:
|
|
||||||
keep_alive=True
|
|
||||||
block_limit = -1
|
|
||||||
|
|
||||||
if session_block_offset == -1:
|
|
||||||
session_block_offset = block_offset
|
|
||||||
elif not config.true('_KEEP_ALIVE'):
|
|
||||||
if block_limit == 0:
|
|
||||||
lock_limit = block_offset
|
|
||||||
|
|
||||||
self.o['SYNCER_OFFSET'] = session_block_offset
|
|
||||||
self.o['SYNCER_LIMIT'] = block_limit
|
|
||||||
|
|
||||||
|
|
||||||
def process_sync_store(self, config):
|
def set(self, k, v):
|
||||||
syncer_store_module = None
|
self.o[k] = v
|
||||||
syncer_store_class = None
|
|
||||||
if config.get('SYNCER_BACKEND') == 'fs':
|
|
||||||
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
|
|
||||||
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
|
|
||||||
elif config.get('SYNCER_BACKEND') == 'rocksdb':
|
|
||||||
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
|
|
||||||
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
|
|
||||||
else:
|
|
||||||
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
|
|
||||||
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
|
|
||||||
|
|
||||||
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
|
|
||||||
|
|
||||||
state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND'))
|
|
||||||
sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
|
|
||||||
logg.info('sync session is {}'.format(sync_store.session_id))
|
|
||||||
|
|
||||||
self.o['SYNCER_STORE'] = sync_store
|
|
||||||
|
|
||||||
|
|
||||||
def __create_callback_filter(self, config, path):
|
|
||||||
callbacks = []
|
|
||||||
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
|
|
||||||
task_split = cb.split(':')
|
|
||||||
task_name = task_split[0]
|
|
||||||
task_queue = config.get('CELERY_QUEUE')
|
|
||||||
if len(task_split) > 1:
|
|
||||||
task_queue = task_split[1]
|
|
||||||
|
|
||||||
r = self.__create_filter(config, path, 'CallbackFilter')
|
|
||||||
r.set_method(task_name)
|
|
||||||
|
|
||||||
|
|
||||||
def __create_filter(self, config, path, cls):
|
|
||||||
m = importlib.import_module(path)
|
|
||||||
o = getattr(m, cls)
|
|
||||||
r = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE'))
|
|
||||||
self.o['SYNCER_STORE'].register(r)
|
|
||||||
return r
|
|
||||||
|
|
||||||
|
|
||||||
def process_sync_filters(self, config):
|
|
||||||
for v in config.get('SYNCER_FILTER').split(','):
|
|
||||||
logg.debug('processing filter {}'.format(v))
|
|
||||||
(path, cls) = v.rsplit('.', maxsplit=1)
|
|
||||||
if cls == 'CallbackFilter':
|
|
||||||
self.__create_callback_filter(config, path)
|
|
||||||
else:
|
|
||||||
self.__create_filter(config, path, cls)
|
|
||||||
|
|
||||||
|
|
||||||
def process(self, config):
|
|
||||||
super(CICTrackerSettings, self).process(config)
|
|
||||||
self.process_sync_range(config)
|
|
||||||
self.process_sync_interface(config)
|
|
||||||
self.process_sync_store(config)
|
|
||||||
self.process_sync_filters(config)
|
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
@ -126,3 +47,119 @@ class CICTrackerSettings(CICSettings):
|
|||||||
for k in ks:
|
for k in ks:
|
||||||
s += '{}: {}\n'.format(k, self.o.get(k))
|
s += '{}: {}\n'.format(k, self.o.get(k))
|
||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def process_sync_interface(settings, config):
|
||||||
|
ifc = EthChainInterface()
|
||||||
|
settings.set('SYNCER_INTERFACE', ifc)
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
||||||
|
|
||||||
|
def process_sync_range(settings, config):
|
||||||
|
o = block_latest()
|
||||||
|
r = settings.get('RPC').do(o)
|
||||||
|
block_offset = int(strip_0x(r), 16) + 1
|
||||||
|
logg.info('network block height at startup is {}'.format(block_offset))
|
||||||
|
|
||||||
|
keep_alive = False
|
||||||
|
session_block_offset = 0
|
||||||
|
block_limit = 0
|
||||||
|
session_block_offset = int(config.get('SYNCER_OFFSET'))
|
||||||
|
|
||||||
|
until = int(config.get('_UNTIL'))
|
||||||
|
if until > 0:
|
||||||
|
if until <= session_block_offset:
|
||||||
|
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
|
||||||
|
block_limit = until
|
||||||
|
else:
|
||||||
|
keep_alive=True
|
||||||
|
block_limit = -1
|
||||||
|
|
||||||
|
if session_block_offset == -1:
|
||||||
|
session_block_offset = block_offset
|
||||||
|
elif not config.true('_KEEP_ALIVE'):
|
||||||
|
if block_limit == 0:
|
||||||
|
lock_limit = block_offset
|
||||||
|
|
||||||
|
settings.set('SYNCER_OFFSET', session_block_offset)
|
||||||
|
settings.set('SYNCER_LIMIT', block_limit)
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
||||||
|
|
||||||
|
def process_sync_store(settings, config):
|
||||||
|
syncer_store_module = None
|
||||||
|
syncer_store_class = None
|
||||||
|
if config.get('SYNCER_BACKEND') == 'fs':
|
||||||
|
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
|
||||||
|
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
|
||||||
|
elif config.get('SYNCER_BACKEND') == 'rocksdb':
|
||||||
|
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
|
||||||
|
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
|
||||||
|
else:
|
||||||
|
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
|
||||||
|
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
|
||||||
|
|
||||||
|
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
|
||||||
|
|
||||||
|
state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND'))
|
||||||
|
sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
|
||||||
|
logg.info('sync session is {}'.format(sync_store.session_id))
|
||||||
|
|
||||||
|
settings.set('SYNCER_STORE', sync_store)
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
||||||
|
|
||||||
|
def __create_callback_filter(settings, config, path):
|
||||||
|
callbacks = []
|
||||||
|
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
|
||||||
|
task_split = cb.split(':')
|
||||||
|
task_name = task_split[0]
|
||||||
|
task_queue = config.get('CELERY_QUEUE')
|
||||||
|
if len(task_split) > 1:
|
||||||
|
task_queue = task_split[1]
|
||||||
|
|
||||||
|
r = __create_filter(settings, config, path, 'CallbackFilter')
|
||||||
|
r.set_method(task_name)
|
||||||
|
|
||||||
|
|
||||||
|
def __create_filter(settings, config, path, cls):
|
||||||
|
m = importlib.import_module(path)
|
||||||
|
o = getattr(m, cls)
|
||||||
|
r = o(
|
||||||
|
settings.get('CHAIN_SPEC'),
|
||||||
|
settings.get('CIC_REGISTRY'),
|
||||||
|
config.get('CELERY_QUEUE'),
|
||||||
|
)
|
||||||
|
settings.get('SYNCER_STORE').register(r)
|
||||||
|
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def process_sync_filters(settings, config):
|
||||||
|
for v in config.get('SYNCER_FILTER').split(','):
|
||||||
|
logg.debug('processing filter {}'.format(v))
|
||||||
|
(path, cls) = v.rsplit('.', maxsplit=1)
|
||||||
|
if cls == 'CallbackFilter':
|
||||||
|
__create_callback_filter(settings, config, path)
|
||||||
|
else:
|
||||||
|
__create_filter(settings, config, path, cls)
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
||||||
|
|
||||||
|
def process_settings(settings, config):
|
||||||
|
settings = process_common(settings, config)
|
||||||
|
settings = process_trusted_addresses(settings, config)
|
||||||
|
settings = process_registry(settings, config)
|
||||||
|
settings = process_celery(settings, config)
|
||||||
|
settings = process_sync_range(settings, config)
|
||||||
|
settings = process_sync_interface(settings, config)
|
||||||
|
settings = process_sync_store(settings, config)
|
||||||
|
settings = process_sync_filters(settings, config)
|
||||||
|
settings = process_database(settings, config)
|
||||||
|
|
||||||
|
return settings
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
chainsyncer[rocksdb]~=0.3.4
|
chainsyncer[rocksdb]==0.3.4
|
||||||
chainlib-eth~=0.1.0
|
chainlib-eth==0.1.0
|
||||||
cic-sync-filter~=0.13.1
|
cic-sync-filter==0.13.1
|
||||||
cic-base[legacy]~=0.13.0
|
cic-base[legacy]==0.14.0
|
||||||
|
Loading…
Reference in New Issue
Block a user