import importlib
import logging
from celery.signals import (
worker_process_init,
worker_process_shutdown,
worker_init,
task_postrun,
)
from peek_plugin_base.worker import CeleryDbConn
from peek_plugin_base.worker.CeleryDbConn import getDbSession
logger = logging.getLogger(__name__)
class __WorkerInit:
# Store the data for the worker processes to initialise with
dbConnectString = None
dbEngineArgs = None
[docs]@worker_init.connect
def initWorkerConnString(sender, **kwargs):
logger.info("Setting worker process database connection string")
__WorkerInit.dbConnectString = sender.app.peekDbConnectString
__WorkerInit.dbEngineArgs = sender.app.peekDbEngineArgs
[docs]@worker_process_init.connect
def initWorkerProcessDbConn(**kwargs):
logger.debug("Creating unique database connection for worker process")
# The next call to CeleryDbConn.dbEngine property will create a new engine
# with this connection string
from peek_plugin_base.worker import CeleryDbConn
CeleryDbConn = importlib.reload(CeleryDbConn)
CeleryDbConn._dbConnectString = __WorkerInit.dbConnectString
CeleryDbConn._dbEngineArgs = __WorkerInit.dbEngineArgs
logger.info("Created unique database connection for worker process")
[docs]@worker_process_shutdown.connect
def shutdownWorkerProcessDbConn(**kwargs):
logger.debug("Closing database connectionn for worker.")
if CeleryDbConn.__ScopedSession:
getDbSession() # Ensure we have a session maker
CeleryDbConn.__ScopedSession.close_all()
if CeleryDbConn.__dbEngine:
CeleryDbConn.__dbEngine.dispose()
logger.info("Closed database connectionn for worker.")
[docs]@task_postrun.connect
def taskEndCloseSession(**kwargs):
CeleryDbConn.getDbSession().close()