cic-tracker/cic_tracker/settings.py

195 lines
6.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# standard imports
import logging
import importlib
import os
# external imports
from chainlib.chain import ChainSpec
from chainlib.eth.address import is_checksum_address
from chainlib.eth.block import block_latest
from cic_eth.registry import (
connect as connect_registry,
connect_declarator,
connect_token_registry,
)
from cic_eth.db import dsn_from_config
from cic_eth.db.models.base import SessionBase
from cic_eth_registry.error import UnknownContractError
import cic_eth.cli
from hexathon import (
to_int as hex_to_int,
strip_0x,
)
from cic_eth_registry import CICRegistry
# local imports
from cic_tracker.chain import EthChainInterface
from cic_tracker.callback import (
state_change_callback,
filter_change_callback,
)
logg = logging.getLogger(__name__)
class CICTrackerSettings:
def __init__(self):
self.o = {}
self.registry = None
self.get = self.o.get
def process_common(self, config):
self.o['CHAIN_SPEC'] = ChainSpec.from_chain_str(config.get('CHAIN_SPEC'))
rpc = cic_eth.cli.RPC.from_config(config)
self.o['RPC'] = rpc.get_default()
def process_celery(self, config):
cic_eth.cli.CeleryApp.from_config(config)
def process_database(self, config):
dsn = dsn_from_config(config)
SessionBase.connect(dsn, pool_size=16, debug=config.true('DATABASE_DEBUG'))
def process_trusted_addresses(self, config):
trusted_addresses_src = config.get('CIC_TRUST_ADDRESS')
if trusted_addresses_src == None:
raise InitializationError('At least one trusted address must be declared in CIC_TRUST_ADDRESS')
trusted_addresses = trusted_addresses_src.split(',')
for i, address in enumerate(trusted_addresses):
if not config.get('_UNSAFE'):
if not is_checksum_address(address):
raise ValueError('address {} at position {} is not a valid checksum address'.format(address, i))
else:
trusted_addresses[i] = to_checksum_address(address)
logg.info('using trusted address {}'.format(address))
self.o['TRUSTED_ADDRESSES'] = trusted_addresses
def process_registry(self, config):
registry = None
try:
registry = connect_registry(self.o['RPC'], self.o['CHAIN_SPEC'], config.get('CIC_REGISTRY_ADDRESS'))
except UnknownContractError as e:
pass
if registry == None:
raise InitializationError('Registry contract connection failed for {}: {}'.format(config.get('CIC_REGISTRY_ADDRESS'), e))
connect_declarator(self.o['RPC'], self.o['CHAIN_SPEC'], self.o['TRUSTED_ADDRESSES'])
connect_token_registry(self.o['RPC'], self.o['CHAIN_SPEC'])
self.o['CIC_REGISTRY'] = CICRegistry(self.o['CHAIN_SPEC'], self.o['RPC'])
def process_callback_filters(self, config):
self.o['CALLBACK_FILTERS'] = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(chain_spec, task_split[1], task_queue)
self.o['CALLBACK_FILTERS'].append(callback_filter)
def process_sync_interface(self, config):
self.o['SYNC_INTERFACE'] = EthChainInterface()
def process_sync_range(self, config):
o = block_latest()
r = self.o['RPC'].do(o)
block_offset = int(strip_0x(r), 16) + 1
logg.info('network block height at startup is {}'.format(block_offset))
keep_alive = False
session_block_offset = 0
block_limit = 0
session_block_offset = int(config.get('SYNC_OFFSET'))
until = int(config.get('_UNTIL'))
if until > 0:
if until <= session_block_offset:
raise ValueError('sync termination block number must be later than offset ({} >= {})'.format(session_block_offset, until))
block_limit = until
else:
keep_alive=True
block_limit = -1
if session_block_offset == -1:
session_block_offset = block_offset
elif not config.true('_KEEP_ALIVE'):
if block_limit == 0:
lock_limit = block_offset
self.o['SYNC_OFFSET'] = session_block_offset
self.o['SYNC_LIMIT'] = block_limit
def process_sync_store(self, config):
syncer_store_module = None
syncer_store_class = None
if config.get('SYNC_BACKEND') == 'fs':
syncer_store_module = importlib.import_module('chainsyncer.store.fs')
syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
elif config.get('SYNC_BACKEND') == 'rocksdb':
syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
else:
syncer_store_module = importlib.import_module(config.get('SYNC_BACKEND'))
syncer_store_class = getattr(syncer_store_module, 'SyncStore')
logg.info('using engine {} module {}.{}'.format(config.get('SYNC_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
state_dir = os.path.join(config.get('SYNC_DIR'), config.get('SYNC_BACKEND'))
sync_store = syncer_store_class(state_dir, session_id=config.get('SYNC_SESSION_ID'), state_event_callback=state_change_callback, filter_state_event_callback=filter_change_callback)
logg.info('sync session is {}'.format(sync_store.session_id))
self.o['SYNC_STORE'] = sync_store
def process_sync_filters(self, config):
for v in config.get('SYNC_FILTER').split(','):
logg.debug('processing filter {}'.format(v))
(path, cls) = v.rsplit('.', maxsplit=1)
m = importlib.import_module(path)
o = getattr(m, cls)
m = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE'))
if v == 'cic_syncer.filter.callback.CallbackFilter':
m.set_method()
o.trusted_addresses = trusted_addresses
self.o['SYNC_STORE'].register(m)
def process_sync(self, config):
self.process_sync_range(config)
self.process_sync_interface(config)
self.process_sync_store(config)
self.process_sync_filters(config)
def process(self, config):
self.process_common(config)
self.process_celery(config)
self.process_database(config)
self.process_trusted_addresses(config)
self.process_registry(config)
self.process_sync(config)
def __str__(self):
ks = list(self.o.keys())
ks.sort()
s = ''
for k in ks:
s += '{}: {}\n'.format(k, self.o.get(k))
return s