Add Worker Service¶
Outline¶
In this document, we setup the Worker service and submit a job from the Logic service. The Logic service will send a random number to worker and, the worker inverts the number and sends it back.
Add Package _private/worker
¶
Create directory peek_plugin_tutorial/_private/worker
Create an empty package file in the worker directory:
peek_plugin_tutorial/_private/worker/__init__.py
Commands:
mkdir peek_plugin_tutorial/_private/worker
touch peek_plugin_tutorial/_private/worker/__init__.py
Add File WorkerEntryHook.py
¶
Create the file peek_plugin_tutorial/_private/worker/EntryHook.py
and populate it with the following contents.
import logging
from peek_plugin_base.worker.WorkerEntryHookABC import WorkerEntryHookABC
from peek_plugin_tutorial._private.worker.tasks import RandomNumber
logger = logging.getLogger(__name__)
class WorkerEntryHook(PluginWorkerEntryHookABC):
def __init__(self, *args, **kwargs):
"""" Constructor """
# Call the base classes constructor
WorkerEntryHookABC.__init__(self, *args, **kwargs)
#: Loaded Objects, This is a list of all objects created when we start
self._loadedObjects = []
def load(self) -> None:
""" Load
This will be called when the plugin is loaded, just after the db is migrated.
Place any custom initialiastion steps here.
"""
logger.debug("Loaded")
def start(self):
""" Load
This will be called when the plugin is loaded, just after the db is migrated.
Place any custom initialiastion steps here.
"""
logger.debug("Started")
def stop(self):
""" Stop
This method is called by the platform to tell the peek app to shutdown and stop
everything it's doing
"""
# Shutdown and dereference all objects we constructed when we started
while self._loadedObjects:
self._loadedObjects.pop().shutdown()
logger.debug("Stopped")
def unload(self):
"""Unload
This method is called after stop is called, to unload any last resources
before the PLUGIN is unlinked from the platform
"""
logger.debug("Unloaded")
@property
def celeryAppIncludes(self):
return [RandomNumber.__name__]
Add Package _private/worker/tasks
¶
Create directory _private/worker/tasks
Create an empty package file in the tasks directory,
peek_plugin_tutorial/_private/worker/tasks/__init__.py
Commands:
mkdir -p peek_plugin_tutorial/_private/worker/tasks
touch peek_plugin_tutorial/_private/worker/tasks/__init__.py
Add File RandomNumber.py
¶
Create the file peek_plugin_tutorial/_private/worker/tasks/RandomNumber.py
and populate it with the following contents. This worker returns the negative number
for the given positive number
import logging
from random import randint
from txcelery.defer import DeferrableTask
from peek_plugin_base.worker.CeleryApp import celeryApp
logger = logging.getLogger(__name__)
@DeferrableTask
@celeryApp.task(bind=True)
def pickRandomNumber(self, item: int) -> int:
"""
Returns random integer between 1 to 1000
"""
return int(item) * -1
Edit peek_plugin_tutorial/__init__.py
¶
Edit the file peek_plugin_tutorial/__init__.py
, and add the following:
from peek_plugin_base.worker.PluginWorkerEntryHookABC import PluginWorkerEntryHookABC
from typing import Type
def peekWorkerEntryHook() -> Type[PluginWorkerEntryHookABC]:
from ._private.worker.WorkerEntryHook import WorkerEntryHook
return WorkerEntryHook
Edit plugin_package.json
¶
Edit the file peek_plugin_tutorial/plugin_package.json
:
Add “worker” to the requiresServices section so it looks like
"requiresServices": [ "worker" ]
Add the worker section after requiresServices section:
"worker": { }
Ensure your JSON is still valid (Your IDE may help here)
Here is an example
{
"plugin": {
...
},
"requiresServices": [
"worker"
],
"worker": {
}
}
The plugin should now be ready for the worker to load.
Running on the Worker Service¶
Edit ~/peek-worker-service.home/config.json
:
- Ensure logging.level is set to “DEBUG”
- Add “peek_plugin_tutorial” to the plugin.enabled array
Note
It would be helpful if this is the only plugin enabled at this point.
It should somthing like this:
{
...
"logging": {
"level": "DEBUG"
},
...
"plugin": {
"enabled": [
"peek_plugin_tutorial"
],
...
},
...
}
Note
This file is created in Administration
You can now run the peek worker, you should see your plugin load.
peek@_peek:~$ run_peek_worker_service
...
DEBUG peek_plugin_tutorial._private.worker.WorkerEntryHook:Loaded
DEBUG peek_plugin_tutorial._private.worker.WorkerEntryHook:Started
...
Push work from logic to worker service¶
Note
Ensure rabbitmq
and redis
services are running
Create peek_plugin_tutorial/_private/logic/controller/RandomNumberWorkerController.py
with below content:
import logging
from twisted.internet import task, reactor, defer
from twisted.internet.defer import inlineCallbacks
from vortex.DeferUtil import deferToThreadWrapWithLogger, vortexLogFailure
from datetime import datetime
from random import randint
import pytz
logger = logging.getLogger(__name__)
class RandomNumberWorkerController:
"""
Random Number Generator
Generates random number on worker periodically
"""
PERIOD = 5
TASK_TIMEOUT = 60.0
def __init__(self):
self._pollLoopingCall = task.LoopingCall(self._poll)
def start(self):
d = self._pollLoopingCall.start(self.PERIOD, now=False)
d.addCallbacks(self._timerCallback, self._timerErrback)
def _timerErrback(self, failure):
vortexLogFailure(failure, logger)
def _timerCallback(self, _):
logger.info("Time executed successfully")
def stop(self):
if self._pollLoopingCall.running:
self._pollLoopingCall.stop()
def shutdown(self):
self.stop()
@inlineCallbacks
def _poll(self):
# Send the tasks to the peek worker
start = randint(1, 1000)
try:
result = yield self._sendToWorker(start)
catch Exception as e:
logger.exception(e)
@inlineCallbacks
def _sendToWorker(self, item):
from peek_plugin_tutorial._private.worker.tasks.RandomNumber import pickRandomNumber
startTime = datetime.now(pytz.utc)
try:
d = pickRandomNumber.delay(item)
d.addTimeout(self.TASK_TIMEOUT, reactor)
randomNumber = yield d
logger.debug("Time Taken = %s, Random Number: %s" % (datetime.now(pytz.utc) - startTime, randomNumber))
except Exception as e:
logger.debug(" RandomNumber task failed : %s", str(e))
Edit peek_plugin_tutorial/_private/logic/LogicEntryHook.py
:
Add the following imports at the top of the file with the other imports:
from peek_plugin_base.logic.PluginLogicWorkerEntryHookABC import PluginLogicWorkerEntryHookABC from peek_plugin_tutorial._private.logic.controller.RandomNumberWorkerController import RandomNumberWorkerController
Add
PluginLogicWorkerEntryHookABC
to list of inherited class:class LogicWorkerEntryHook(PluginLogicWorkerEntryHookABC, ...):
Add this line just before the
logger.debug("Started")
line at the end of thestart()
method:randomNumberController = RandomNumberWorkerController() self._loadedObjects.append(randomNumberController) randomNumberController.start()
Run run_peek_logic_service
¶
You can now run the peek logic service, you should see output like below, showing the :
