cic-internal-integration/apps/cic-ussd/cic_ussd/cache.py

61 lines
1.2 KiB
Python
Raw Normal View History

# standard imports
import hashlib
import logging
2021-11-29 16:04:50 +01:00
from typing import Union
# external imports
from cic_types.condiments import MetadataPointer
2021-02-06 16:13:47 +01:00
from redis import Redis
2021-11-29 16:04:50 +01:00
logg = logging.getLogger(__file__)
2021-02-06 16:13:47 +01:00
2021-08-06 18:29:01 +02:00
class Cache:
store: Redis = None
def cache_data(key: str, data: [bytes, float, int, str]):
"""
:param key:
:type key:
:param data:
:type data:
:return:
:rtype:
"""
2021-08-06 18:29:01 +02:00
cache = Cache.store
cache.set(name=key, value=data)
cache.persist(name=key)
2021-08-06 18:29:01 +02:00
logg.debug(f'caching: {data} with key: {key}.')
def get_cached_data(key: str):
"""
:param key:
:type key:
:return:
:rtype:
"""
2021-08-06 18:29:01 +02:00
cache = Cache.store
return cache.get(name=key)
2021-11-29 16:04:50 +01:00
def cache_data_key(identifier: Union[list, bytes], salt: MetadataPointer):
"""
:param identifier:
:type identifier:
:param salt:
:type salt:
:return:
:rtype:
"""
hash_object = hashlib.new("sha256")
2021-11-29 16:04:50 +01:00
if isinstance(identifier, list):
for identity in identifier:
hash_object.update(identity)
else:
hash_object.update(identifier)
if salt != MetadataPointer.NONE:
hash_object.update(salt.value.encode(encoding="utf-8"))
return hash_object.digest().hex()