Add Worker Service

Outline

In this document, we setup the Worker service and submit a job from the Server service. The Server service will send a random number to worker and, the worker inverts the number and sends it back.

Add Package _private/worker

Create directory peek_plugin_tutorial/_private/worker

Create an empty package file in the worker directory: peek_plugin_tutorial/_private/worker/__init__.py

Commands:

mkdir peek_plugin_tutorial/_private/worker
touch peek_plugin_tutorial/_private/worker/__init__.py

Add File WorkerEntryHook.py

Create the file peek_plugin_tutorial/_private/worker/EntryHook.py and populate it with the following contents.

import logging
from peek_plugin_base.worker.WorkerEntryHookABC import WorkerEntryHookABC
from peek_plugin_tutorial._private.worker.tasks import RandomNumber

logger = logging.getLogger(__name__)


class WorkerEntryHook(PluginWorkerEntryHookABC):
    def __init__(self, *args, **kwargs):
        """" Constructor """
        # Call the base classes constructor
        WorkerEntryHookABC.__init__(self, *args, **kwargs)

        #: Loaded Objects, This is a list of all objects created when we start
        self._loadedObjects = []

    def load(self) -> None:
        """ Load

        This will be called when the plugin is loaded, just after the db is migrated.
        Place any custom initialiastion steps here.

        """
        logger.debug("Loaded")

    def start(self):
        """ Load

        This will be called when the plugin is loaded, just after the db is migrated.
        Place any custom initialiastion steps here.

        """
        logger.debug("Started")

    def stop(self):
        """ Stop

        This method is called by the platform to tell the peek app to shutdown and stop
        everything it's doing
        """
        # Shutdown and dereference all objects we constructed when we started
        while self._loadedObjects:
            self._loadedObjects.pop().shutdown()

        logger.debug("Stopped")

    def unload(self):
        """Unload

        This method is called after stop is called, to unload any last resources
        before the PLUGIN is unlinked from the platform

        """
        logger.debug("Unloaded")

    @property
    def celeryAppIncludes(self):
        return [RandomNumber.__name__]

Add Package _private/worker/tasks

Create directory _private/worker/tasks

Create an empty package file in the tasks directory, peek_plugin_tutorial/_private/worker/tasks/__init__.py

Commands:

mkdir -p peek_plugin_tutorial/_private/worker/tasks
touch peek_plugin_tutorial/_private/worker/tasks/__init__.py

Add File RandomNumber.py

Create the file peek_plugin_tutorial/_private/worker/tasks/RandomNumber.py and populate it with the following contents. This worker returns the negative number for the given positive number

import logging
from random import randint
from txcelery.defer import DeferrableTask
from peek_plugin_base.worker.CeleryApp import celeryApp

logger = logging.getLogger(__name__)


@DeferrableTask
@celeryApp.task(bind=True)
def pickRandomNumber(self, item: int) -> int:
    """
    Returns random integer between 1 to 1000
    """
    return int(item) * -1

Edit peek_plugin_tutorial/__init__.py

Edit the file peek_plugin_tutorial/__init__.py, and add the following:

from peek_plugin_base.worker.PluginWorkerEntryHookABC import PluginWorkerEntryHookABC
from typing import Type


def peekWorkerEntryHook() -> Type[PluginWorkerEntryHookABC]:
    from ._private.worker.WorkerEntryHook import WorkerEntryHook
    return WorkerEntryHook

Edit plugin_package.json

Edit the file peek_plugin_tutorial/plugin_package.json :

  1. Add “worker” to the requiresServices section so it looks like

    "requiresServices": [
        "worker"
    ]
    
  2. Add the worker section after requiresServices section:

    "worker": {
    }
    
  3. Ensure your JSON is still valid (Your IDE may help here)

Here is an example

{
    "plugin": {
        ...
    },
    "requiresServices": [
        "worker"
    ],
    "worker": {
    }
}

The plugin should now be ready for the worker to load.

Running on the Worker Service

Edit ~/peek-worker.home/config.json:

  1. Ensure logging.level is set to “DEBUG”
  2. Add “peek_plugin_tutorial” to the plugin.enabled array

Note

It would be helpful if this is the only plugin enabled at this point.

It should somthing like this:

{
    ...
    "logging": {
        "level": "DEBUG"
    },
    ...
    "plugin": {
        "enabled": [
            "peek_plugin_tutorial"
        ],
        ...
    },
    ...
}

Note

This file is created in Administration


You can now run the peek worker, you should see your plugin load.

peek@peek:~$ run_peek_worker
...
DEBUG peek_plugin_tutorial._private.worker.WorkerEntryHook:Loaded
DEBUG peek_plugin_tutorial._private.worker.WorkerEntryHook:Started
...

Push work from server to worker service

Note

Ensure rabbitmq and redis services are running

Create peek_plugin_tutorial/_private/server/controller/RandomNumberWorkerController.py with below content:

import logging
from twisted.internet import task, reactor, defer
from twisted.internet.defer import inlineCallbacks
from vortex.DeferUtil import deferToThreadWrapWithLogger, vortexLogFailure
from datetime import datetime
from random import randint
import pytz

logger = logging.getLogger(__name__)


class RandomNumberWorkerController:
    """
        Random Number Generator
        Generates random number on worker periodically
    """

    PERIOD = 5
    TASK_TIMEOUT = 60.0

    def __init__(self):
        self._pollLoopingCall = task.LoopingCall(self._poll)

    def start(self):
        d = self._pollLoopingCall.start(self.PERIOD, now=False)
        d.addCallbacks(self._timerCallback, self._timerErrback)

    def _timerErrback(self, failure):
        vortexLogFailure(failure, logger)

    def _timerCallback(self, _):
        logger.info("Time executed successfully")

    def stop(self):
        if self._pollLoopingCall.running:
            self._pollLoopingCall.stop()

    def shutdown(self):
        self.stop()

    @inlineCallbacks
    def _poll(self):
        # Send the tasks to the peek worker
        start = randint(1, 1000)
        try:
            result = yield self._sendToWorker(start)
        catch Exception as e:
            logger.exception(e)

    @inlineCallbacks
    def _sendToWorker(self, item):
        from peek_plugin_tutorial._private.worker.tasks.RandomNumber import pickRandomNumber
        startTime = datetime.now(pytz.utc)

        try:
            d = pickRandomNumber.delay(item)
            d.addTimeout(self.TASK_TIMEOUT, reactor)
            randomNumber = yield d
            logger.debug("Time Taken = %s, Random Number: %s" % (datetime.now(pytz.utc) - startTime, randomNumber))
        except Exception as e:
            logger.debug(" RandomNumber task failed : %s", str(e))

Edit peek_plugin_tutorial/_private/server/ServerEntryHook.py:

  1. Add the following imports at the top of the file with the other imports:

    from peek_plugin_base.server.PluginServerWorkerEntryHookABC import PluginServerWorkerEntryHookABC
    from peek_plugin_tutorial._private.server.controller.RandomNumberWorkerController import RandomNumberWorkerController
    
  2. Add PluginServerStorageEntryHookABC to list of inherited class:

    class ServerEntryHook(PluginServerWorkerEntryHookABC, ...):
    
  3. Add this line just before the logger.debug("Started") line at the end of the start() method:

    randomNumberController = RandomNumberWorkerController()
    self._loadedObjects.append(randomNumberController)
    randomNumberController.start()
    

Run run_peek_server

You can now run the peek server, you should see output like below, showing the :

../../_images/PeekWorkerOutput.png