Source code for peek_plugin_base.worker.CeleryDbConnInit

import importlib
import logging

from celery.signals import (
    worker_process_init,
    worker_process_shutdown,
    worker_init,
    task_postrun,
)

from peek_plugin_base.worker import CeleryDbConn
from peek_plugin_base.worker.CeleryDbConn import getDbSession

logger = logging.getLogger(__name__)


class __WorkerInit:
    # Store the data for the worker processes to initialise with
    dbConnectString = None
    dbEngineArgs = None


[docs]@worker_init.connect def initWorkerConnString(sender, **kwargs): logger.info("Setting worker process database connection string") __WorkerInit.dbConnectString = sender.app.peekDbConnectString __WorkerInit.dbEngineArgs = sender.app.peekDbEngineArgs
[docs]@worker_process_init.connect def initWorkerProcessDbConn(**kwargs): logger.debug("Creating unique database connection for worker process") # The next call to CeleryDbConn.dbEngine property will create a new engine # with this connection string from peek_plugin_base.worker import CeleryDbConn CeleryDbConn = importlib.reload(CeleryDbConn) CeleryDbConn._dbConnectString = __WorkerInit.dbConnectString CeleryDbConn._dbEngineArgs = __WorkerInit.dbEngineArgs logger.info("Created unique database connection for worker process")
[docs]@worker_process_shutdown.connect def shutdownWorkerProcessDbConn(**kwargs): logger.debug("Closing database connectionn for worker.") if CeleryDbConn.__ScopedSession: getDbSession() # Ensure we have a session maker CeleryDbConn.__ScopedSession.close_all() if CeleryDbConn.__dbEngine: CeleryDbConn.__dbEngine.dispose() logger.info("Closed database connectionn for worker.")
[docs]@task_postrun.connect def taskEndCloseSession(**kwargs): CeleryDbConn.getDbSession().close()