73 lines
2.7 KiB
Python
73 lines
2.7 KiB
Python
|
# standard imports
|
||
|
import json
|
||
|
from datetime import timedelta
|
||
|
|
||
|
# third party imports
|
||
|
import celery
|
||
|
from celery.utils.log import get_logger
|
||
|
|
||
|
# local imports
|
||
|
from cic_ussd.db.models.base import SessionBase
|
||
|
from cic_ussd.db.models.ussd_session import UssdSession
|
||
|
from cic_ussd.error import SessionNotFoundError
|
||
|
from cic_ussd.session.ussd_session import UssdSession as InMemoryUssdSession
|
||
|
|
||
|
celery_app = celery.current_app
|
||
|
logg = get_logger(__file__)
|
||
|
|
||
|
|
||
|
@celery_app.task
|
||
|
def persist_session_to_db(external_session_id: str):
|
||
|
"""
|
||
|
This task initiates the saving of the session object to the database and it's removal from the in-memory storage.
|
||
|
:param external_session_id: The session id of the session to be saved.
|
||
|
:type external_session_id: str.
|
||
|
:return: The representation of the newly created database object or en error message if session is not found.
|
||
|
:rtype: str.
|
||
|
:raises SessionNotFoundError: If the session object is not found in memory.
|
||
|
:raises VersionTooLowError: If the session's version doesn't match the latest version.
|
||
|
"""
|
||
|
# create session
|
||
|
session = SessionBase.create_session()
|
||
|
|
||
|
# get ussd session in redis cache
|
||
|
in_memory_session = InMemoryUssdSession.redis_cache.get(external_session_id)
|
||
|
|
||
|
# process persistence to db
|
||
|
if in_memory_session:
|
||
|
in_memory_session = json.loads(in_memory_session)
|
||
|
in_db_ussd_session = session.query(UssdSession).filter_by(external_session_id=external_session_id).first()
|
||
|
if in_db_ussd_session:
|
||
|
in_db_ussd_session.update(
|
||
|
session=session,
|
||
|
user_input=in_memory_session.get('user_input'),
|
||
|
state=in_memory_session.get('state'),
|
||
|
version=in_memory_session.get('version'),
|
||
|
)
|
||
|
else:
|
||
|
in_db_ussd_session = UssdSession(
|
||
|
external_session_id=external_session_id,
|
||
|
service_code=in_memory_session.get('service_code'),
|
||
|
msisdn=in_memory_session.get('msisdn'),
|
||
|
user_input=in_memory_session.get('user_input'),
|
||
|
state=in_memory_session.get('state'),
|
||
|
version=in_memory_session.get('version'),
|
||
|
)
|
||
|
|
||
|
# handle the updating of session data for persistence to db
|
||
|
session_data = in_memory_session.get('session_data')
|
||
|
|
||
|
if session_data:
|
||
|
for key, value in session_data.items():
|
||
|
in_db_ussd_session.set_data(key=key, value=value, session=session)
|
||
|
|
||
|
session.add(in_db_ussd_session)
|
||
|
InMemoryUssdSession.redis_cache.expire(external_session_id, timedelta(minutes=1))
|
||
|
else:
|
||
|
session.close()
|
||
|
raise SessionNotFoundError('Session does not exist!')
|
||
|
|
||
|
session.commit()
|
||
|
session.close()
|
||
|
|