Source code for peek_plugin_base.worker.PluginWorkerEntryHookABC

from abc import abstractproperty

from celery.app.base import Celery
from peek_plugin_base.PluginCommonEntryHookABC import PluginCommonEntryHookABC
from peek_plugin_base.worker.PeekWorkerPlatformHookABC import (
    PeekWorkerPlatformHookABC,
)


[docs]class PluginWorkerEntryHookABC(PluginCommonEntryHookABC): def __init__( self, pluginName: str, pluginRootDir: str, platform: PeekWorkerPlatformHookABC, ): PluginCommonEntryHookABC.__init__( self, pluginName=pluginName, pluginRootDir=pluginRootDir ) self._platform = platform @property def platform(self) -> PeekWorkerPlatformHookABC: return self._platform @abstractproperty def celeryAppIncludes(self) -> [str]: """Celery App Includes This property returns the absolute package paths to the modules with the tasks :Example: ["plugin_noop.worker.NoopWorkerTask"] :return: A list of package+module names that Celery should import. """