From f2a0ef99ec0fc7e4c20c0623252d9768fbcee806 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 20 Feb 2021 11:29:11 +0100 Subject: [PATCH] WIP Add callback reception in traffic handler, change redis callback to same dict as http --- apps/cic-eth/cic_eth/callbacks/redis.py | 8 ++++++-- apps/contract-migration/scripts/traffic.py | 21 ++++++++++++++++++--- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/apps/cic-eth/cic_eth/callbacks/redis.py b/apps/cic-eth/cic_eth/callbacks/redis.py index 648edabb..745f9f8f 100644 --- a/apps/cic-eth/cic_eth/callbacks/redis.py +++ b/apps/cic-eth/cic_eth/callbacks/redis.py @@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger() def redis(self, result, destination, status_code): (host, port, db, channel) = destination.split(':') r = redis_interface.Redis(host=host, port=port, db=db) - s = json.dumps(result) + data = { + 'root_id': self.request.root_id, + 'status': status_code, + 'result': result, + } logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel)) - r.publish(channel, s) + r.publish(channel, json.dumps(data)) r.close() diff --git a/apps/contract-migration/scripts/traffic.py b/apps/contract-migration/scripts/traffic.py index 5c9f1dae..ff674c2a 100644 --- a/apps/contract-migration/scripts/traffic.py +++ b/apps/contract-migration/scripts/traffic.py @@ -8,6 +8,7 @@ import uuid import importlib import copy import random +import json # external imports import redis @@ -122,7 +123,11 @@ class TrafficItem: self.method = item.do self.uuid = uuid.uuid4() self.ext = None - self.complete = False + self.result = None + + + def __str__(self): + return 'traffic item method {} uuid {}'.format(self.method, self.uuid) class TrafficRouter: @@ -256,15 +261,25 @@ class Handler: logg.info('traffic method {} completed immediately') self.traffic_router.release(traffic_item) traffic_item.ext = t - self.traffic_items[traffic_item.uuid] = traffic_item + self.traffic_items[traffic_item.ext] = traffic_item # TODO: add drain while True: - m = self.pubsub.get_message(timeout=0.01) + m = self.pubsub.get_message(timeout=0.1) if m == None: break logg.debug('redis message {}'.format(m)) + if m['type'] == 'message': + message_data = json.loads(m['data']) + uu = message_data['root_id'] + match_item = self.traffic_items[uu] + self.traffic_router.release(match_item) + if message_data['status'] == 0: + logg.error('task item {} failed with error code {}'.format(match_item, message_data['status'])) + else: + match_item['result'] = message_data['result'] + logg.debug('got callback result: {}'.format(match_item)) def name(self):