Set up per-method callback filter registration

This commit is contained in:
lash 2022-04-25 06:02:29 +00:00
parent 21eeb4b90a
commit 38eca1299e
Signed by: lash
GPG Key ID: 21D2E7BB88C2A746
1 changed files with 25 additions and 20 deletions

View File

@ -24,16 +24,6 @@ logg = logging.getLogger(__name__)
class CICTrackerSettings(CICSettings):
def process_sync_callback_filters(self, config):
self.o['CALLBACK_FILTERS'] = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
if len(task_split) > 1:
task_queue = task_split[0]
callback_filter = CallbackFilter(self.o['CHAIN_SPEC'], task_split[1], self.o['CELERY_QUEUE'])
self.o['CALLBACK_FILTERS'].append(callback_filter)
def process_sync_interface(self, config):
self.o['SYNCER_INTERFACE'] = EthChainInterface()
@ -90,19 +80,35 @@ class CICTrackerSettings(CICSettings):
self.o['SYNCER_STORE'] = sync_store
def __create_callback_filter(self, config, path):
callbacks = []
for cb in config.get('TASKS_TRANSFER_CALLBACKS', '').split(','):
task_split = cb.split(':')
task_name = task_split[0]
task_queue = config.get('CELERY_QUEUE')
if len(task_split) > 1:
task_queue = task_split[1]
r = self.__create_filter(config, path, 'CallbackFilter')
r.set_method(task_name)
def __create_filter(self, config, path, cls):
m = importlib.import_module(path)
o = getattr(m, cls)
r = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE'))
self.o['SYNCER_STORE'].register(r)
return r
def process_sync_filters(self, config):
for v in config.get('SYNCER_FILTER').split(','):
logg.debug('processing filter {}'.format(v))
(path, cls) = v.rsplit('.', maxsplit=1)
m = importlib.import_module(path)
o = getattr(m, cls)
m = o(self.o['CHAIN_SPEC'], self.o['CIC_REGISTRY'], config.get('CELERY_QUEUE'))
if v == 'cic_syncer.filter.callback.CallbackFilter':
m.set_method()
o.trusted_addresses = trusted_addresses
self.o['SYNCER_STORE'].register(m)
if cls == 'CallbackFilter':
self.__create_callback_filter(config, path)
else:
self.__create_filter(config, path, cls)
def process(self, config):
@ -110,7 +116,6 @@ class CICTrackerSettings(CICSettings):
self.process_sync_range(config)
self.process_sync_interface(config)
self.process_sync_store(config)
self.process_sync_callback_filters(config)
self.process_sync_filters(config)