WIP Add callback reception in traffic handler, change redis callback to same dict as http

This commit is contained in:
nolash 2021-02-20 11:29:11 +01:00
parent e7958aaf9e
commit f2a0ef99ec
Signed by untrusted user who does not match committer: lash
GPG Key ID: 21D2E7BB88C2A746
2 changed files with 24 additions and 5 deletions

View File

@ -18,7 +18,11 @@ logg = celery_app.log.get_default_logger()
def redis(self, result, destination, status_code):
(host, port, db, channel) = destination.split(':')
r = redis_interface.Redis(host=host, port=port, db=db)
s = json.dumps(result)
data = {
'root_id': self.request.root_id,
'status': status_code,
'result': result,
}
logg.debug('redis callback on host {} port {} db {} channel {}'.format(host, port, db, channel))
r.publish(channel, s)
r.publish(channel, json.dumps(data))
r.close()

View File

@ -8,6 +8,7 @@ import uuid
import importlib
import copy
import random
import json
# external imports
import redis
@ -122,7 +123,11 @@ class TrafficItem:
self.method = item.do
self.uuid = uuid.uuid4()
self.ext = None
self.complete = False
self.result = None
def __str__(self):
return 'traffic item method {} uuid {}'.format(self.method, self.uuid)
class TrafficRouter:
@ -256,15 +261,25 @@ class Handler:
logg.info('traffic method {} completed immediately')
self.traffic_router.release(traffic_item)
traffic_item.ext = t
self.traffic_items[traffic_item.uuid] = traffic_item
self.traffic_items[traffic_item.ext] = traffic_item
# TODO: add drain
while True:
m = self.pubsub.get_message(timeout=0.01)
m = self.pubsub.get_message(timeout=0.1)
if m == None:
break
logg.debug('redis message {}'.format(m))
if m['type'] == 'message':
message_data = json.loads(m['data'])
uu = message_data['root_id']
match_item = self.traffic_items[uu]
self.traffic_router.release(match_item)
if message_data['status'] == 0:
logg.error('task item {} failed with error code {}'.format(match_item, message_data['status']))
else:
match_item['result'] = message_data['result']
logg.debug('got callback result: {}'.format(match_item))
def name(self):