cic-internal-integration/apps/cic-ussd/cic_ussd/db/models/base.py

130 lines
4.0 KiB
Python
Raw Normal View History

2021-04-08 07:09:38 +02:00
# stanard imports
import logging
2021-02-06 16:13:47 +01:00
import datetime
2021-04-08 07:09:38 +02:00
# external imports
2021-02-06 16:13:47 +01:00
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
2021-04-08 07:09:38 +02:00
from sqlalchemy.pool import (
StaticPool,
QueuePool,
AssertionPool,
NullPool,
)
logg = logging.getLogger().getChild(__name__)
2021-02-06 16:13:47 +01:00
Model = declarative_base(name='Model')
class SessionBase(Model):
2021-04-08 07:09:38 +02:00
"""The base object for all SQLAlchemy enabled models. All other models must extend this.
"""
2021-02-06 16:13:47 +01:00
__abstract__ = True
created = Column(DateTime, default=datetime.datetime.utcnow)
updated = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
2021-04-08 07:09:38 +02:00
id = Column(Integer, primary_key=True)
2021-02-06 16:13:47 +01:00
engine = None
2021-04-08 07:09:38 +02:00
"""Database connection engine of the running aplication"""
sessionmaker = None
"""Factory object responsible for creating sessions from the connection pool"""
transactional = True
"""Whether the database backend supports query transactions. Should be explicitly set by initialization code"""
poolable = True
"""Whether the database backend supports connection pools. Should be explicitly set by initialization code"""
procedural = True
"""Whether the database backend supports stored procedures"""
localsessions = {}
"""Contains dictionary of sessions initiated by db model components"""
2021-02-06 16:13:47 +01:00
@staticmethod
def create_session():
2021-04-08 07:09:38 +02:00
"""Creates a new database session.
"""
return SessionBase.sessionmaker()
2021-02-06 16:13:47 +01:00
@staticmethod
def _set_engine(engine):
2021-04-08 07:09:38 +02:00
"""Sets the database engine static property
"""
2021-02-06 16:13:47 +01:00
SessionBase.engine = engine
2021-04-08 07:09:38 +02:00
SessionBase.sessionmaker = sessionmaker(bind=SessionBase.engine)
2021-02-06 16:13:47 +01:00
@staticmethod
2021-04-08 07:09:38 +02:00
def connect(dsn, pool_size=16, debug=False):
"""Create new database connection engine and connect to database backend.
:param dsn: DSN string defining connection.
:type dsn: str
"""
e = None
if SessionBase.poolable:
poolclass = QueuePool
if pool_size > 1:
logg.info('db using queue pool')
e = create_engine(
dsn,
max_overflow=pool_size*3,
pool_pre_ping=True,
pool_size=pool_size,
pool_recycle=60,
poolclass=poolclass,
echo=debug,
)
else:
if pool_size == 0:
poolclass = NullPool
elif debug:
poolclass = AssertionPool
else:
poolclass = StaticPool
e = create_engine(
dsn,
poolclass=poolclass,
echo=debug,
)
else:
logg.info('db connection not poolable')
e = create_engine(
dsn,
echo=debug,
)
SessionBase._set_engine(e)
2021-02-06 16:13:47 +01:00
@staticmethod
def disconnect():
2021-04-08 07:09:38 +02:00
"""Disconnect from database and free resources.
"""
2021-02-06 16:13:47 +01:00
SessionBase.engine.dispose()
SessionBase.engine = None
2021-04-08 07:09:38 +02:00
@staticmethod
def bind_session(session=None):
localsession = session
if localsession == None:
localsession = SessionBase.create_session()
localsession_key = str(id(localsession))
logg.debug('creating new session {}'.format(localsession_key))
SessionBase.localsessions[localsession_key] = localsession
return localsession
@staticmethod
def release_session(session=None):
session_key = str(id(session))
if SessionBase.localsessions.get(session_key) != None:
logg.debug('commit and destroy session {}'.format(session_key))
session.commit()
session.close()