cic-tracker/cic_tracker/settings.py

166 lines
4.9 KiB
Python
Raw Permalink Normal View History

2022-04-24 13:03:00 +02:00
# standard imports
import logging
import importlib
import os
# external imports
from chainlib.eth.block import block_latest
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
2022-05-24 17:56:57 +02:00
#from cic_base import CICSettings
from cic_base.settings import (
process_common,
process_database,
process_trusted_addresses,
process_registry,
process_celery,
)
2022-04-24 17:03:21 +02:00
from cic_sync_filter.callback import CallbackFilter
2022-04-24 13:03:00 +02:00
# local imports
from cic_tracker.chain import EthChainInterface
from cic_tracker.callback import (
state_change_callback,
filter_change_callback,
)
logg = logging.getLogger(__name__)
2022-05-24 17:56:57 +02:00
class CICTrackerSettings:
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
def __init__(self):
self.o = {}
self.get = self.o.get
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
def set(self, k, v):
self.o[k] = v
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
def __str__(self):
ks = list(self.o.keys())
ks.sort()
s = ''
for k in ks:
s += '{}: {}\n'.format(k, self.o.get(k))
return s
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
def process_sync_interface(settings, config):
ifc = EthChainInterface()
settings.set('SYNCER_INTERFACE', ifc)
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
return settings
2022-05-24 17:56:57 +02:00
def process_sync_range(settings, config):
o = block_latest()
r = settings.get('RPC').do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
2022-05-24 17:56:57 +02:00
keep_alive = False
session_block_offset = 0
block_limit = 0
session_block_offset = int(config.get('SYNCER_OFFSET'))
2022-05-24 17:56:57 +02:00
until = int(config.get('_UNTIL'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
else:
keep_alive=True
block_limit = -1
2022-05-24 17:56:57 +02:00
if session_block_offset == -1:
session_block_offset = block_offset
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
lock_limit = block_offset
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
settings.set('SYNCER_OFFSET', session_block_offset)
settings.set('SYNCER_LIMIT', block_limit)
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
return settings
2022-04-24 13:03:00 +02:00
2022-05-24 17:56:57 +02:00
def process_sync_store(settings, config):
syncer_store_module = None
syncer_store_class = None
if config.get('SYNCER_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNCER_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
state_dir = os.path.join(config.get('SYNCER_DIR'), config.get('SYNCER_BACKEND'))
sync_store = syncer_store_class(state_dir, session_id=config.get('SYNCER_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
logg.info('sync session is {}'.format(sync_store.session_id))
settings.set('SYNCER_STORE', sync_store)
return settings
def __create_callback_filter(settings, config, path):
callbacks = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_name = task_split[0]
task_queue = config.get('CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[1]
r = __create_filter(settings, config, path, 'CallbackFilter')
r.set_method(task_name)
def __create_filter(settings, config, path, cls):
m = importlib.import_module(path)
o = getattr(m, cls)
r = o(
settings.get('CHAIN_SPEC'),
settings.get('CIC_REGISTRY'),
config.get('CELERY_QUEUE'),
)
settings.get('SYNCER_STORE').register(r)
return r
def process_sync_filters(settings, config):
for v in config.get('SYNCER_FILTER').split(','):
logg.debug('processing filter {}'.format(v))
(path, cls) = v.rsplit('.', maxsplit=1)
if cls == 'CallbackFilter':
__create_callback_filter(settings, config, path)
else:
__create_filter(settings, config, path, cls)
return settings
def process_settings(settings, config):
settings = process_common(settings, config)
settings = process_trusted_addresses(settings, config)
settings = process_registry(settings, config)
settings = process_celery(settings, config)
settings = process_sync_range(settings, config)
settings = process_sync_interface(settings, config)
settings = process_sync_store(settings, config)
settings = process_sync_filters(settings, config)
settings = process_database(settings, config)
return settings