from logging import Logger
from typing import Any, Optional, Callable
from peek_plugin_base.storage.DbConnection import DbSessionCreator
from vortex.DeferUtil import deferToThreadWrapWithLogger
[docs]def runPyInPg(
logger: Logger,
dbSessionCreator: DbSessionCreator,
classMethodToRun: Callable,
classMethodToImportTuples: Optional[Callable] = None,
*args,
**kwargs
) -> Any:
return deferToThreadWrapWithLogger(logger)(runPyInPgBlocking)(
dbSessionCreator,
classMethodToRun,
classMethodToImportTuples,
*args,
**kwargs
)
[docs]def runPyInPgBlocking(
dbSessionCreator: DbSessionCreator,
classMethodToRun: Any,
classMethodToImportTuples: Optional[Callable] = None,
*args,
**kwargs
) -> Any:
from peek_storage_service.plpython.RunPyInPg import runPyInPgBlocking
return runPyInPgBlocking(
dbSessionCreator,
classMethodToRun,
classMethodToImportTuples,
*args,
**kwargs
)