Add Vortex RPC

Outline

Peek has distributed services, and one plugin is usually run on more than one of these services. This can make it incredibly complicated for code within the plugin that runs on the agent service, to talk to the server service for example.

In this document, we go through using the Vortex RPC to simplify communications between the server and agent service.

What is RPC

RPC stands for Remote Procedure Call, essentially is allows you to call methods/functions/procedure over the network to another process.

In this example, the two processes are the Agent service and the Server service. These are completely separate processes, so you can’t just call a method defined in the server service from the agent service.

Vortex RPC provides wrappers that make it easy to define procedures in one service, and call them from another.

Note

Vortex RPC calls return twisted.internet.defer.Deferred, regardless of what the actual method returns.

../_images/LearnRPC_Diagram.png

Server RPC Setup

In this section we setup the files required to define an RPC on the server that will only accept calls from the agent.

The RPC example could be much simpler, the intenftion is to show more of a good design verses the bare minimum RPC example.

../_images/LearnRPC_AgentToServer.png

Add Package agent_handlers

The agent_handlers python package will contain the classes that generate tuple data to send via the observable.


Create the peek_plugin_tutorial/_private/server/agent_handlers package, with the commands

mkdir peek_plugin_tutorial/_private/server/agent_handlers
touch peek_plugin_tutorial/_private/server/agent_handlers/__init__.py

Add File RpcForAgent.py

File RpcForAgent.py defines the methods the agent will call via RPC.

In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.


Create the file peek_plugin_tutorial/_private/server/agent_handlers/RpcForAgent.py and populate it with the following contents.

import logging

from peek_plugin_base.PeekVortexUtil import peekServerName, peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from peek_plugin_tutorial._private.server.controller.MainController import MainController
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple
from vortex.rpc.RPC import vortexRPC

logger = logging.getLogger(__name__)


class RpcForAgent:
    def __init__(self, mainController: MainController, dbSessionCreator):
        self._mainController = mainController
        self._dbSessionCreator = dbSessionCreator

    def makeHandlers(self):
        """ Make Handlers

        In this method we start all the RPC handlers
        start() returns an instance of it's self so we can simply yield the result
        of the start method.

        """

        yield self.addInts.start(funcSelf=self)
        yield self.updateStatus.start(funcSelf=self)
        yield self.addStringInt.start(funcSelf=self)
        logger.debug("RPCs started")

    # -------------
    @vortexRPC(peekServerName,
               acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
    def addInts(self, val1, kwval1=9):
        """ Add Ints

        This is the simplest RPC example possible

        """
        return val1 + kwval1

    # -------------
    @vortexRPC(peekServerName,
               acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
    def updateStatus(self, updateStr: str):
        """ Update Status

        The agent may be running something and send updates on occasion,
        tell these to the main controller, it can deal with them.

        """
        self._mainController.agentNotifiedOfUpdate(updateStr)

    # -------------
    @vortexRPC(peekServerName, acceptOnlyFromVortex=peekAgentName,
               additionalFilt=tutorialFilt, deferToThread=True)
    def addStringInt(self, stringInt: StringIntTuple):
        """ Insert a stringInt

        In this example RPC method, The agent tells the server to insert data into
        the database.

        It's a better design get the main controller to do things like this.
        It will know what else needs updating after the insert (IE, The observable)

        Notice the :code:`deferToThread=True` argument in :code:`@vortexRPC`?
        Because this code is blocking code, not written for twisted, we need to
        defer it to a thread so it doesn't block twisteds main reactor.

        As it's no longer in the twisted thread, all the code in this method
        should be standard blocking code.

        """
        session = self._dbSessionCreator()
        try:
            session.add(stringInt)

        except:
            session.rollback()
            raise

        finally:
            session.close()

Edit File MainController.py

We need to update MainController.py, to add an example method that the RpcForAgent will call.


Edit the file peek_plugin_tutorial/_private/server/controller/MainController.py:

  1. Add this line to the bottom of the file, inside the class definition:

    def agentNotifiedOfUpdate(self, updateStr):
        logger.debug("Agent said : %s", updateStr)
    

Edit File ServerEntryHook.py

We need to update ServerEntryHook.py, to initialise the RpcForAgent.


Edit the file peek_plugin_tutorial/_private/server/ServerEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .agent_handlers.RpcForAgent import RpcForAgent
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise the RpcForAgent
    self._loadedObjects.extend(RpcForAgent(mainController, self.dbSessionCreator)
                               .makeHandlers())
    

The sever side RPC is now setup.

Agent Calling Server RPC

This section implements the code in the agent that will call the RPC methods that the server has defined.

Add File AgentToServerRpcCallExample.py

File AgentToServerRpcCallExample.py defines the methods the agent will call via RPC.

In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.


Create the file peek_plugin_tutorial/_private/agent/AgentToServerRpcCallExample.py and populate it with the following contents.

import logging

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent import RpcForAgent
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple

logger = logging.getLogger(__name__)


class AgentToServerRpcCallExample:
    def start(self):
        # kickoff the example
        # Tell the reactor to start it in 5 seconds, we shouldn't do things like
        # this in the plugins start method.
        reactor.callLater(5, self.runWithInlineCallback)

        # Return self, to make it simpler for the AgentEntryHook
        return self

    @inlineCallbacks
    def runWithInlineCallback(self):
        """ Run With Inline Callbacks

        To understand what the :code:`@inlineCallbacks` decorator does, you can read
        more in the twisted documentation.

        This is the simplest way to go with asynchronous code.

        Yield here, will cause the flow of code to return to the twisted.reactor
        until the deferreds callback or errback is called.

        The errback will cause an exception, which we'd catch with a standard
        try/except block.

        """

        # The :code:`@vortexRPC` decorator wraps the :code:`RpcForAgent.updateStatus`
        # method with an instance of the :code:`_VortexRPC` class,
        # this class has a :code:`__call__` method implemented, that is what we're
        # calling here.
        #
        # So although it looks like we're trying to call a class method, that's not what's
        # happening.
        yield RpcForAgent.updateStatus("Agent RPC Example Started")

        seedInt = 5
        logger.debug("seedInt = %s", seedInt)

        for _ in range(5):
            seedInt = yield RpcForAgent.addInts(seedInt, kwval1=7)
            logger.debug("seedInt = %s", seedInt)

        # Move onto the run method.
        # We don't use yield here, so :code:`runWithInlineCallback` will continue on and
        # finish
        self.run()
        logger.debug("runWithInlineCallback finished")

    def run(self):
        """ Run

        In this method, we call some RPCs and handle the deferreds.

        We won't be using @inlineCallbacks here. We will setup all the calls and
        callbacks, then the run method will return. The calls and callbacks will happen
        long after this method finishes.

        """

        stringInt = StringIntTuple(int1=50, string1="Created from Agent RPC")

        d = RpcForAgent.addStringInt(stringInt)

        # the deferred will call the lambda function,
        #   "_" will be the result of "addStringInt, which we ignore
        #   the lambda function calls RpcForAgent.updateStatus,
        #   which will return a deferred
        #
        # Returning a deferred from a callback is fine, it's just merilly processed
        d.addCallback(lambda _: RpcForAgent.updateStatus("Agent RPC Example Completed"))

        # Unless you have a good reason, always return the last deferred.
        return d

    def shutdown(self):
        pass

Edit File AgentEntryHook.py

We need to update AgentEntryHook.py, to initialise the AgentToServerRpcCallExample.


Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .AgentToServerRpcCallExample import AgentToServerRpcCallExample
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the AgentToServerRpcCallExample
    self._loadedObjects.append(AgentToServerRpcCallExample().start())
    

The agent will now call the server RPC methods.

Agent RPC Setup

In this section we setup the files required to define an RPC on the agent that the server will call.

Some example use cases would be: * Agent to query data from external DB * Agent to connect to remote server via SSH and pull back some data * Agent to push an update to a corporate system via HTTP

../_images/LearnRPC_ServerToAgent.png

Add File RpcForServer.py

File RpcForServer.py defines the methods the server will call via RPC.


Create the file peek_plugin_tutorial/_private/agent/RpcForServer.py and populate it with the following contents.

import logging

from peek_plugin_base.PeekVortexUtil import peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from vortex.rpc.RPC import vortexRPC

logger = logging.getLogger(__name__)


class RpcForServer:
    def __init__(self):
        pass

    def makeHandlers(self):
        """ Make Handlers

        In this method we start all the RPC handlers
        start() returns an instance of it's self so we can simply yield the result
        of the start method.

        """

        yield self.subInts.start(funcSelf=self)
        logger.debug("Server RPCs started")

    # -------------
    @vortexRPC(peekAgentName, additionalFilt=tutorialFilt)
    def subInts(self, val1, kwval1=9):
        """ Add Ints

        This is the simplest RPC example possible.

        :param val1: A value to start with
        :param kwval1: The value to subtract
        :return: One value minus the other

        """
        return val1 - kwval1

Edit File AgentEntryHook.py

We need to update AgentEntryHook.py, to initialise the RpcForServer.


Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .RpcForServer import RpcForServer
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the RPC for Server
    self._loadedObjects.extend(RpcForServer().makeHandlers())
    

The sever side RPC is now setup.

Server Calling Agent RPC

This section implements the code in the server that will call the RPC methods that the agent has defined.

Add File ServerToAgentRpcCallExample.py

File ServerToAgentRpcCallExample.py defines the methods the server will call via RPC.


Create the file peek_plugin_tutorial/_private/server/ServerToAgentRpcCallExample.py and populate it with the following contents.

import logging

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

from peek_plugin_tutorial._private.agent.RpcForServer import RpcForServer

logger = logging.getLogger(__name__)


class ServerToAgentRpcCallExample:
    def start(self):
        # kickoff the example
        # Tell the reactor to start it in 20 seconds, we shouldn't do things like
        # this in the plugins start method.
        reactor.callLater(20, self.run)

        return self

    @inlineCallbacks
    def run(self):
        # Call the agents RPC method
        result = yield RpcForServer.subInts(7, kwval1=5)
        logger.debug("seedInt result = %s (Should be 2)", result)

    def shutdown(self):
        pass

Edit File ServerEntryHook.py

We need to update ServerEntryHook.py, to initialise the ServerToAgentRpcCallExample.


Edit the file peek_plugin_tutorial/_private/server/ServerEntryHook.py:

  1. Add this import at the top of the file with the other imports:

    from .ServerToAgentRpcCallExample import ServerToAgentRpcCallExample
    
  2. Add this line just before the logger.debug("Started") line at the end of the start() method:

    # Initialise and start the RPC for Server
    self._loadedObjects.append(ServerToAgentRpcCallExample().start())
    

The server will now call the RPC method on the agent when it starts.

Testing

  1. Open a command window and run: run_peek_server
  2. Open a command window and run: run_peek_agent
  3. Examine the logs of both command windows

run_peek_server log example:

19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.server.controller.MainController:Agent said : Agent RPC Example Started
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.server.controller.MainController:Agent said : Agent RPC Example Completed

run_peek_agent log example:

19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 5
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 12
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 19
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 26
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 33
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 40
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:runWithInlineCallback finished
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus