diff --git a/cic_tracker/settings.py b/cic_tracker/settings.py index 86e2cb3..95062c1 100644 --- a/cic_tracker/settings.py +++ b/cic_tracker/settings.py @@ -24,16 +24,6 @@ logg = logging.getLogger(__name__) class CICTrackerSettings(CICSettings): - def process_sync_callback_filters(self, config): - self.o['CALLBACK_FILTERS'] = [] - for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): - task_split = cb.split(':') - if len(task_split) > 1: - task_queue = task_split[0] - callback_filter = CallbackFilter(self.o['CHAIN_SPEC'], task_split[1], self.o['CELERY_QUEUE']) - self.o['CALLBACK_FILTERS'].append(callback_filter) - - def process_sync_interface(self, config): self.o['SYNCER_INTERFACE'] = EthChainInterface() @@ -90,19 +80,35 @@ class CICTrackerSettings(CICSettings): self.o['SYNCER_STORE'] = sync_store + def __create_callback_filter(self, config, path): + callbacks = [] + for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','): + task_split = cb.split(':') + task_name = task_split[0] + task_queue = config.get('CELERY_QUEUE') + if len(task_split) > 1: + task_queue = task_split[1] + + r = self.__create_filter(config, path, 'CallbackFilter') + r.set_method(task_name) + + + def __create_filter(self, config, path, cls): + m = importlib.import_module(path) + o = getattr(m, cls) + r = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE')) + self.o['SYNCER_STORE'].register(r) + return r + + def process_sync_filters(self, config): for v in config.get('SYNCER_FILTER').split(','): logg.debug('processing filter {}'.format(v)) (path, cls) = v.rsplit('.', maxsplit=1) - m = importlib.import_module(path) - o = getattr(m, cls) - m = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE')) - - if v == 'cic_syncer.filter.callback.CallbackFilter': - m.set_method() - o.trusted_addresses = trusted_addresses - - self.o['SYNCER_STORE'].register(m) + if cls == 'CallbackFilter': + self.__create_callback_filter(config, path) + else: + self.__create_filter(config, path, cls) def process(self, config): @@ -110,7 +116,6 @@ class CICTrackerSettings(CICSettings): self.process_sync_range(config) self.process_sync_interface(config) self.process_sync_store(config) - self.process_sync_callback_filters(config) self.process_sync_filters(config)