From c4ac452e0fed1fa2fa79330e15776acb1f4e84dc Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 24 May 2022 15:56:57 +0000 Subject: [PATCH] Stateless settings update --- cic_tracker/cache.py | 3 +- cic_tracker/callback.py | 2 - cic_tracker/runnable/daemon.py | 7 +- cic_tracker/settings.py | 223 +++++++++++++++++++-------------- requirements.txt | 8 +- 5 files changed, 140 insertions(+), 103 deletions(-) diff --git a/cic_tracker/cache.py b/cic_tracker/cache.py index efd5c58..e56f866 100644 --- a/cic_tracker/cache.py +++ b/cic_tracker/cache.py @@ -8,6 +8,7 @@ class SyncTimeRedisCache: self.state = RedisMonitor('cic-eth-tracker', host=host, port=port, db=db) self.state.register('lastseen', persist=True) + def cache_time(self, block, tx): v = self.state.get('lastseen') if v != None: @@ -15,5 +16,3 @@ class SyncTimeRedisCache: if v > block.timestamp: return self.state.set('lastseen', block.timestamp) - - diff --git a/cic_tracker/callback.py b/cic_tracker/callback.py index b8d6ad9..0f24cbe 100644 --- a/cic_tracker/callback.py +++ b/cic_tracker/callback.py @@ -23,5 +23,3 @@ def state_change_callback(k, old_state, new_state): def filter_change_callback(k, old_state, new_state): logg.log(logging.STATETRACE, 'filter change: {} {} -> {}'.format(k, old_state, new_state)) - - diff --git a/cic_tracker/runnable/daemon.py b/cic_tracker/runnable/daemon.py index 9663a9f..f1f40c5 100644 --- a/cic_tracker/runnable/daemon.py +++ b/cic_tracker/runnable/daemon.py @@ -14,7 +14,10 @@ import cic_base.cli # local imports #from cic_tracker.cache import SyncTimeRedisCache -from cic_tracker.settings import CICTrackerSettings +from cic_tracker.settings import ( + CICTrackerSettings, + process_settings, + ) from cic_tracker.callback import ( pre_callback, post_callback, @@ -53,7 +56,7 @@ config.add(args.until, '_UNTIL', True) def main(): settings = CICTrackerSettings() - settings.process(config) + process_settings(settings, config) logg.debug('settings:\n' + str(settings)) drv = ChainInterfaceDriver( diff --git a/cic_tracker/settings.py b/cic_tracker/settings.py index 95062c1..7f95429 100644 --- a/cic_tracker/settings.py +++ b/cic_tracker/settings.py @@ -9,7 +9,14 @@ from hexathon import ( to_int as hex_to_int, strip_0x, ) -from cic_base import CICSettings +#from cic_base import CICSettings +from cic_base.settings import ( + process_common, + process_database, + process_trusted_addresses, + process_registry, + process_celery, + ) from cic_sync_filter.callback import CallbackFilter # local imports @@ -22,101 +29,15 @@ from cic_tracker.callback import ( logg = logging.getLogger(__name__) -class CICTrackerSettings(CICSettings): +class CICTrackerSettings: - def process_sync_interface(self, config): - self.o['SYNCER_INTERFACE'] = EthChainInterface() - - - def process_sync_range(self, config): - o = block_latest() - r = self.o['RPC'].do(o) - block_offset = int(strip_0x(r), 16) + 1 - logg.info('network block height at startup is {}'.format(block_offset)) - - keep_alive = False - session_block_offset = 0 - block_limit = 0 - session_block_offset = int(config.get('SYNCER_OFFSET')) - - until = int(config.get('_UNTIL')) - if until > 0: - if until <= session_block_offset: - raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until)) - block_limit = until - else: - keep_alive=True - block_limit = -1 - - if session_block_offset == -1: - session_block_offset = block_offset - elif not config.true('_KEEP_ALIVE'): - if block_limit == 0: - lock_limit = block_offset - - self.o['SYNCER_OFFSET'] = session_block_offset - self.o['SYNCER_LIMIT'] = block_limit + def __init__(self): + self.o = {} + self.get = self.o.get - def process_sync_store(self, config): - syncer_store_module = None - syncer_store_class = None - if config.get('SYNCER_BACKEND') == 'fs': - syncer_store_module = importlib.import_module('chainsyncer.store.fs') - syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') - elif config.get('SYNCER_BACKEND') == 'rocksdb': - syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') - syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') - else: - syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) - syncer_store_class = getattr(syncer_store_module, 'SyncStore') - - logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) - - state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND')) - sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) - logg.info('sync session is {}'.format(sync_store.session_id)) - - self.o['SYNCER_STORE'] = sync_store - - - def __create_callback_filter(self, config, path): - callbacks = [] - for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): - task_split = cb.split(':') - task_name = task_split[0] - task_queue = config.get('CELERY_QUEUE') - if len(task_split) > 1: - task_queue = task_split[1] - - r = self.__create_filter(config, path, 'CallbackFilter') - r.set_method(task_name) - - - def __create_filter(self, config, path, cls): - m = importlib.import_module(path) - o = getattr(m, cls) - r = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE')) - self.o['SYNCER_STORE'].register(r) - return r - - - def process_sync_filters(self, config): - for v in config.get('SYNCER_FILTER').split(','): - logg.debug('processing filter {}'.format(v)) - (path, cls) = v.rsplit('.', maxsplit=1) - if cls == 'CallbackFilter': - self.__create_callback_filter(config, path) - else: - self.__create_filter(config, path, cls) - - - def process(self, config): - super(CICTrackerSettings, self).process(config) - self.process_sync_range(config) - self.process_sync_interface(config) - self.process_sync_store(config) - self.process_sync_filters(config) + def set(self, k, v): + self.o[k] = v def __str__(self): @@ -126,3 +47,119 @@ class CICTrackerSettings(CICSettings): for k in ks: s += '{}: {}\n'.format(k, self.o.get(k)) return s + + + +def process_sync_interface(settings, config): + ifc = EthChainInterface() + settings.set('SYNCER_INTERFACE', ifc) + + return settings + + +def process_sync_range(settings, config): + o = block_latest() + r = settings.get('RPC').do(o) + block_offset = int(strip_0x(r), 16) + 1 + logg.info('network block height at startup is {}'.format(block_offset)) + + keep_alive = False + session_block_offset = 0 + block_limit = 0 + session_block_offset = int(config.get('SYNCER_OFFSET')) + + until = int(config.get('_UNTIL')) + if until > 0: + if until <= session_block_offset: + raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until)) + block_limit = until + else: + keep_alive=True + block_limit = -1 + + if session_block_offset == -1: + session_block_offset = block_offset + elif not config.true('_KEEP_ALIVE'): + if block_limit == 0: + lock_limit = block_offset + + settings.set('SYNCER_OFFSET', session_block_offset) + settings.set('SYNCER_LIMIT', block_limit) + + return settings + + +def process_sync_store(settings, config): + syncer_store_module = None + syncer_store_class = None + if config.get('SYNCER_BACKEND') == 'fs': + syncer_store_module = importlib.import_module('chainsyncer.store.fs') + syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') + elif config.get('SYNCER_BACKEND') == 'rocksdb': + syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') + syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') + else: + syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) + syncer_store_class = getattr(syncer_store_module, 'SyncStore') + + logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) + + state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND')) + sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback) + logg.info('sync session is {}'.format(sync_store.session_id)) + + settings.set('SYNCER_STORE', sync_store) + + return settings + + +def __create_callback_filter(settings, config, path): + callbacks = [] + for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): + task_split = cb.split(':') + task_name = task_split[0] + task_queue = config.get('CELERY_QUEUE') + if len(task_split) > 1: + task_queue = task_split[1] + + r = __create_filter(settings, config, path, 'CallbackFilter') + r.set_method(task_name) + + +def __create_filter(settings, config, path, cls): + m = importlib.import_module(path) + o = getattr(m, cls) + r = o( + settings.get('CHAIN_SPEC'), + settings.get('CIC_REGISTRY'), + config.get('CELERY_QUEUE'), + ) + settings.get('SYNCER_STORE').register(r) + + return r + + +def process_sync_filters(settings, config): + for v in config.get('SYNCER_FILTER').split(','): + logg.debug('processing filter {}'.format(v)) + (path, cls) = v.rsplit('.', maxsplit=1) + if cls == 'CallbackFilter': + __create_callback_filter(settings, config, path) + else: + __create_filter(settings, config, path, cls) + + return settings + + +def process_settings(settings, config): + settings = process_common(settings, config) + settings = process_trusted_addresses(settings, config) + settings = process_registry(settings, config) + settings = process_celery(settings, config) + settings = process_sync_range(settings, config) + settings = process_sync_interface(settings, config) + settings = process_sync_store(settings, config) + settings = process_sync_filters(settings, config) + settings = process_database(settings, config) + + return settings diff --git a/requirements.txt b/requirements.txt index 6c0507a..77069e4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -chainsyncer[rocksdb]~=0.3.4 -chainlib-eth~=0.1.0 -cic-sync-filter~=0.13.1 -cic-base[legacy]~=0.13.0 +chainsyncer[rocksdb]==0.3.4 +chainlib-eth==0.1.0 +cic-sync-filter==0.13.1 +cic-base[legacy]==0.14.0