cic-tracker/cic_tracker/settings.py

166 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import logging
import importlib
import os
# external imports
from chainlib.eth.block import block_latest
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
#from cic_base import CICSettings
from cic_base.settings import (
process_common,
process_database,
process_trusted_addresses,
process_registry,
process_celery,
)
from cic_sync_filter.callback import CallbackFilter
# local imports
from cic_tracker.chain import EthChainInterface
from cic_tracker.callback import (
state_change_callback,
filter_change_callback,
)
logg = logging.getLogger(__name__)
class CICTrackerSettings:
def __init__(self):
self.o = {}
self.get = self.o.get
def set(self, k, v):
self.o[k] = v
def __str__(self):
ks = list(self.o.keys())
ks.sort()
s = ''
for k in ks:
s += '{}: {}\n'.format(k, self.o.get(k))
return s
def process_sync_interface(settings, config):
ifc = EthChainInterface()
settings.set('SYNCER_INTERFACE', ifc)
return settings
def process_sync_range(settings, config):
o = block_latest()
r = settings.get('RPC').do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
session_block_offset = int(config.get('SYNCER_OFFSET'))
until = int(config.get('_UNTIL'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
else:
keep_alive=True
block_limit = -1
if session_block_offset == -1:
session_block_offset = block_offset
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
lock_limit = block_offset
settings.set('SYNCER_OFFSET', session_block_offset)
settings.set('SYNCER_LIMIT', block_limit)
return settings
def process_sync_store(settings, config):
syncer_store_module = None
syncer_store_class = None
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND'))
sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
logg.info('sync session is {}'.format(sync_store.session_id))
settings.set('SYNCER_STORE', sync_store)
return settings
def __create_callback_filter(settings, config, path):
callbacks = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_name = task_split[0]
task_queue = config.get('CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[1]
r = __create_filter(settings, config, path, 'CallbackFilter')
r.set_method(task_name)
def __create_filter(settings, config, path, cls):
m = importlib.import_module(path)
o = getattr(m, cls)
r = o(
settings.get('CHAIN_SPEC'),
settings.get('CIC_REGISTRY'),
config.get('CELERY_QUEUE'),
)
settings.get('SYNCER_STORE').register(r)
return r
def process_sync_filters(settings, config):
for v in config.get('SYNCER_FILTER').split(','):
logg.debug('processing filter {}'.format(v))
(path, cls) = v.rsplit('.', maxsplit=1)
if cls == 'CallbackFilter':
__create_callback_filter(settings, config, path)
else:
__create_filter(settings, config, path, cls)
return settings
def process_settings(settings, config):
settings = process_common(settings, config)
settings = process_trusted_addresses(settings, config)
settings = process_registry(settings, config)
settings = process_celery(settings, config)
settings = process_sync_range(settings, config)
settings = process_sync_interface(settings, config)
settings = process_sync_store(settings, config)
settings = process_sync_filters(settings, config)
settings = process_database(settings, config)
return settings