Add Vortex RPC

Outline

Peek has distributed services, and one plugin is usually run on more than one of these services. This can make it incredibly complicated for code within the plugin that runs on the agent service, to talk to the logic service for example.

In this document, we go through using the Vortex RPC to simplify communications between the logic and agent service.

What is RPC

RPC stands for Remote Procedure Call, essentially is allows you to call methods/functions/procedure over the network to another process.

In this example, the two processes are the Agent service and the Logic service. These are completely separate processes, so you can’t just call a method defined in the logic service from the agent service.

Vortex RPC provides wrappers that make it easy to define procedures in one service, and call them from another.

Note

Vortex RPC calls return twisted.internet.defer.Deferred, regardless of what the actual method returns.

../../_images/LearnRPC_Diagram.png

Logic Service RPC Setup

In this section we setup the files required to define an RPC on the logic service that will only accept calls from the agent.

The RPC example could be much simpler, the intention is to show more of a good design verses the bare minimum RPC example.

../../_images/LearnRPC_AgentToLogic.png

Add Package agent_handlers

The agent_handlers python package will contain the classes that generate tuple data to send via the observable.


Create the peek_plugin_tutorial/_private/logic/agent_handlers package, with the commands

mkdir peek_plugin_tutorial/_private/logic/agent_handlers
touch peek_plugin_tutorial/_private/logic/agent_handlers/__init__.py

Add File RpcForAgent.py

File RpcForAgent.py defines the methods the agent will call via RPC.

In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.


Create the file peek_plugin_tutorial/_private/logic/agent_handlers/RpcForAgent.py and populate it with the following contents.

import logging

from peek_plugin_base.PeekVortexUtil import peekServerName, peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from peek_plugin_tutorial._private.logic.controller.MainController import MainController
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple
from vortex.rpc.RPC import vortexRPC

logger = logging.getLogger(__name__)


class RpcForAgent:
    def __init__(self, mainController: MainController, dbSessionCreator):
        self._mainController = mainController
        self._dbSessionCreator = dbSessionCreator

    def makeHandlers(self):
        """ Make Handlers

        In this method we start all the RPC handlers
        start() returns an instance of itself so we can simply yield the result
        of the start method.

        """

        yield self.addInts.start(funcSelf=self)
        yield self.updateStatus.start(funcSelf=self)
        yield self.addStringInt.start(funcSelf=self)
        logger.debug("RPCs started")

    # -------------
    @vortexRPC(peekServerName,
               acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
    def addInts(self, val1, kwval1=9):
        """ Add Ints

        This is the simplest RPC example possible

        """
        return val1 + kwval1

    # -------------
    @vortexRPC(peekServerName,
               acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
    def updateStatus(self, updateStr: str):
        """ Update Status

        The agent may be running something and send updates on occasion,
        tell these to the main controller, it can deal with them.

        """
        self._mainController.agentNotifiedOfUpdate(updateStr)

    # -------------
    @vortexRPC(peekServerName, acceptOnlyFromVortex=peekAgentName,
               additionalFilt=tutorialFilt, deferToThread=True)
    def addStringInt(self, stringInt: StringIntTuple):
        """ Insert a stringInt

        In this example RPC method, The agent tells the logic service to insert data into
        the database.

        It's a better design get the main controller to do things like this.
        It will know what else needs updating after the insert (IE, The observable)

        Notice the :code:`deferToThread=True` argument in :code:`@vortexRPC`?
        Because this code is blocking code, not written for twisted, we need to
        defer it to a thread so it doesn't block twisteds main reactor.

        As it's no longer in the twisted thread, all the code in this method
        should be standard blocking code.

        """
        session = self._dbSessionCreator()
        try:
            session.add(stringInt)

        except:
            session.rollback()
            raise

        finally:
            session.close()

Edit File MainController.py

We need to update MainController.py, to add an example method that the RpcForAgent will call.


Edit the file peek_plugin_tutorial/_private/logic/controller/MainController.py:

  1. Add this line to the bottom of the file, inside the class definition:

    def agentNotifiedOfUpdate(self, updateStr):
        logger.debug("Agent said : %s", updateStr)
    

Edit File LogicEntryHook.py

We need to update LogicEntryHook.py, to initialise the RpcForAgent.


Edit the file peek_plugin_tutorial/_private/logic/LogicEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .agent_handlers.RpcForAgent import RpcForAgent
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise the RpcForAgent
    self._loadedObjects.extend(RpcForAgent(mainController, self.dbSessionCreator)
                               .makeHandlers())
    

The sever side RPC is now setup.

Agent Calling Logic Service RPC

This section implements the code in the agent that will call the RPC methods that the logic service has defined.

Add File AgentToLogicRpcCallExample.py

File AgentToLogicRpcCallExample.py defines the methods the agent will call via RPC.

In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.


Create the file peek_plugin_tutorial/_private/agent/AgentToLogicRpcCallExample.py and populate it with the following contents.

import logging

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent import RpcForAgent
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple

logger = logging.getLogger(__name__)


class AgentToLogicRpcCallExample:
    def start(self):
        # kickoff the example
        # Tell the reactor to start it in 5 seconds, we shouldn't do things like
        # this in the plugins start method.
        reactor.callLater(5, self.runWithInlineCallback)

        # Return self, to make it simpler for the AgentEntryHook
        return self

    @inlineCallbacks
    def runWithInlineCallback(self):
        """ Run With Inline Callbacks

        To understand what the :code:`@inlineCallbacks` decorator does, you can read
        more in the twisted documentation.

        This is the simplest way to go with asynchronous code.

        Yield here, will cause the flow of code to return to the twisted.reactor
        until the deferreds callback or errback is called.

        The errback will cause an exception, which we'd catch with a standard
        try/except block.

        """

        # The :code:`@vortexRPC` decorator wraps the :code:`RpcForAgent.updateStatus`
        # method with an instance of the :code:`_VortexRPC` class,
        # this class has a :code:`__call__` method implemented, that is what we're
        # calling here.
        #
        # So although it looks like we're trying to call a class method, that's not what's
        # happening.
        yield RpcForAgent.updateStatus("Agent RPC Example Started")

        seedInt = 5
        logger.debug("seedInt = %s", seedInt)

        for _ in range(5):
            seedInt = yield RpcForAgent.addInts(seedInt, kwval1=7)
            logger.debug("seedInt = %s", seedInt)

        # Move onto the run method.
        # We don't use yield here, so :code:`runWithInlineCallback` will continue on and
        # finish
        self.run()
        logger.debug("runWithInlineCallback finished")

    def run(self):
        """ Run

        In this method, we call some RPCs and handle the deferreds.

        We won't be using @inlineCallbacks here. We will setup all the calls and
        callbacks, then the run method will return. The calls and callbacks will happen
        long after this method finishes.

        """

        stringInt = StringIntTuple(int1=50, string1="Created from Agent RPC")

        d = RpcForAgent.addStringInt(stringInt)

        # the deferred will call the lambda function,
        #   "_" will be the result of "addStringInt, which we ignore
        #   the lambda function calls RpcForAgent.updateStatus,
        #   which will return a deferred
        #
        # Returning a deferred from a callback is fine, it's just merilly processed
        d.addCallback(lambda _: RpcForAgent.updateStatus("Agent RPC Example Completed"))

        # Unless you have a good reason, always return the last deferred.
        return d

    def shutdown(self):
        pass

Edit File AgentEntryHook.py

We need to update AgentEntryHook.py, to initialise the AgentToLogicRpcCallExample.


Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .AgentToLogicRpcCallExample import AgentToLogicRpcCallExample
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the AgentToLogicRpcCallExample
    self._loadedObjects.append(AgentToLogicRpcCallExample().start())
    

The agent will now call the logic service RPC methods.

Agent RPC Setup

In this section we setup the files required to define an RPC on the agent that the logic service will call.

Some example use cases would be: * Agent to query data from external DB * Agent to connect to remote server via SSH and pull back some data * Agent to push an update to a corporate system via HTTP

../../_images/LearnRPC_LogicToAgent.png

Add File RpcForLogic.py

File RpcForLogic.py defines the methods the logic service will call via RPC.


Create the file peek_plugin_tutorial/_private/agent/RpcForLogic.py and populate it with the following contents.

import logging

from peek_plugin_base.PeekVortexUtil import peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from vortex.rpc.RPC import vortexRPC

logger = logging.getLogger(__name__)


class RpcForLogic:
    def __init__(self):
        pass

    def makeHandlers(self):
        """ Make Handlers

        In this method we start all the RPC handlers
        start() returns an instance of itself so we can simply yield the result
        of the start method.

        """

        yield self.subInts.start(funcSelf=self)
        logger.debug("LogicService RPCs started")

    # -------------
    @vortexRPC(peekAgentName, additionalFilt=tutorialFilt)
    def subInts(self, val1, kwval1=9):
        """ Add Ints

        This is the simplest RPC example possible.

        :param val1: A value to start with
        :param kwval1: The value to subtract
        :return: One value minus the other

        """
        return val1 - kwval1

Edit File AgentEntryHook.py

We need to update AgentEntryHook.py, to initialise the RpcForLogic.


Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .RpcForLogic import RpcForLogic
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the RPC for Logic Service
    self._loadedObjects.extend(RpcForLogic().makeHandlers())
    

The sever side RPC is now setup.

Logic Service Calling Agent RPC

This section implements the code in the logic service that will call the RPC methods that the agent has defined.

Add File LogicToAgentRpcCallExample.py

File LogicToAgentRpcCallExample.py defines the methods the logic service will call via RPC.


Create the file peek_plugin_tutorial/_private/logic/LogicToAgentRpcCallExample.py and populate it with the following contents.

import logging

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from peek_plugin_tutorial._private.agent.RpcForLogic import RpcForLogic

logger = logging.getLogger(__name__)


class LogicToAgentRpcCallExample:
    def start(self):
        # kickoff the example
        # Tell the reactor to start it in 20 seconds, we shouldn't do things like
        # this in the plugins start method.
        reactor.callLater(20, self.run)

        return self

    @inlineCallbacks
    def run(self):
        # Call the agents RPC method
        result = yield RpcForLogic.subInts(7, kwval1=5)
        logger.debug("seedInt result = %s (Should be 2)", result)

    def shutdown(self):
        pass

Edit File LogicEntryHook.py

We need to update LogicEntryHook.py, to initialise the LogicToAgentRpcCallExample.


Edit the file peek_plugin_tutorial/_private/logic/LogicEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .LogicToAgentRpcCallExample import LogicToAgentRpcCallExample
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the RPC for Logic Service
    self._loadedObjects.append(LogicToAgentRpcCallExample().start())
    

The logic service will now call the RPC method on the agent when it starts.

Testing

  1. Open a command window and run: run_peek_logic_service

  2. Open a command window and run: run_peek_agent_service

  3. Examine the logs of both command windows

run_peek_logic_service log example:

19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.logic.controller.MainController:Agent said : Agent RPC Example Started
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.logic.controller.MainController:Agent said : Agent RPC Example Completed

run_peek_agent_service log example:

19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 5
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 12
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 19
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 26
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 33
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:seedInt = 40
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToLogicRpcCallExample:runWithInlineCallback finished
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.logic.agent_handlers.RpcForAgent.RpcForAgent.updateStatus