Add Vortex RPC¶
Outline¶
Peek has distributed services, and one plugin is usually run on more than one of these services. This can make it incredibly complicated for code within the plugin that runs on the agent service, to talk to the server service for example.
In this document, we go through using the Vortex RPC to simplify communications between the server and agent service.
What is RPC¶
RPC stands for Remote Procedure Call, essentially is allows you to call methods/functions/procedure over the network to another process.
In this example, the two processes are the Agent service and the Server service. These are completely separate processes, so you can’t just call a method defined in the server service from the agent service.
Vortex RPC provides wrappers that make it easy to define procedures in one service, and call them from another.
Note
Vortex RPC calls return twisted.internet.defer.Deferred
, regardless
of what the actual method returns.

Server RPC Setup¶
In this section we setup the files required to define an RPC on the server that will only accept calls from the agent.
The RPC example could be much simpler, the intenftion is to show more of a good design verses the bare minimum RPC example.

Add Package agent_handlers
¶
The agent_handlers
python package will contain the classes that generate tuple
data to send via the observable.
Create the peek_plugin_tutorial/_private/server/agent_handlers
package, with
the commands
mkdir peek_plugin_tutorial/_private/server/agent_handlers
touch peek_plugin_tutorial/_private/server/agent_handlers/__init__.py
Add File RpcForAgent.py
¶
File RpcForAgent.py
defines the methods the agent will call via RPC.
In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.
Create the file
peek_plugin_tutorial/_private/server/agent_handlers/RpcForAgent.py
and populate it with the following contents.
import logging
from peek_plugin_base.PeekVortexUtil import peekServerName, peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from peek_plugin_tutorial._private.server.controller.MainController import MainController
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple
from vortex.rpc.RPC import vortexRPC
logger = logging.getLogger(__name__)
class RpcForAgent:
def __init__(self, mainController: MainController, dbSessionCreator):
self._mainController = mainController
self._dbSessionCreator = dbSessionCreator
def makeHandlers(self):
""" Make Handlers
In this method we start all the RPC handlers
start() returns an instance of itself so we can simply yield the result
of the start method.
"""
yield self.addInts.start(funcSelf=self)
yield self.updateStatus.start(funcSelf=self)
yield self.addStringInt.start(funcSelf=self)
logger.debug("RPCs started")
# -------------
@vortexRPC(peekServerName,
acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
def addInts(self, val1, kwval1=9):
""" Add Ints
This is the simplest RPC example possible
"""
return val1 + kwval1
# -------------
@vortexRPC(peekServerName,
acceptOnlyFromVortex=peekAgentName, additionalFilt=tutorialFilt)
def updateStatus(self, updateStr: str):
""" Update Status
The agent may be running something and send updates on occasion,
tell these to the main controller, it can deal with them.
"""
self._mainController.agentNotifiedOfUpdate(updateStr)
# -------------
@vortexRPC(peekServerName, acceptOnlyFromVortex=peekAgentName,
additionalFilt=tutorialFilt, deferToThread=True)
def addStringInt(self, stringInt: StringIntTuple):
""" Insert a stringInt
In this example RPC method, The agent tells the server to insert data into
the database.
It's a better design get the main controller to do things like this.
It will know what else needs updating after the insert (IE, The observable)
Notice the :code:`deferToThread=True` argument in :code:`@vortexRPC`?
Because this code is blocking code, not written for twisted, we need to
defer it to a thread so it doesn't block twisteds main reactor.
As it's no longer in the twisted thread, all the code in this method
should be standard blocking code.
"""
session = self._dbSessionCreator()
try:
session.add(stringInt)
except:
session.rollback()
raise
finally:
session.close()
Edit File MainController.py
¶
We need to update MainController.py
, to add an example method that the
RpcForAgent will call.
Edit the file peek_plugin_tutorial/_private/server/controller/MainController.py
:
Add this line to the bottom of the file, inside the class definition:
def agentNotifiedOfUpdate(self, updateStr): logger.debug("Agent said : %s", updateStr)
Edit File ServerEntryHook.py
¶
We need to update ServerEntryHook.py
, to initialise the RpcForAgent.
Edit the file peek_plugin_tutorial/_private/server/ServerEntryHook.py
:
Add this import at the top of the file with the other imports:
from .agent_handlers.RpcForAgent import RpcForAgent
Add this line just before the
logger.debug("Started")
line at the end of thestart()
method:# Initialise the RpcForAgent self._loadedObjects.extend(RpcForAgent(mainController, self.dbSessionCreator) .makeHandlers())
The sever side RPC is now setup.
Agent Calling Server RPC¶
This section implements the code in the agent that will call the RPC methods that the server has defined.
Add File AgentToServerRpcCallExample.py
¶
File AgentToServerRpcCallExample.py
defines the methods the agent will
call via RPC.
In this example we have just one file, however it it will be good practice to have multiple files if the require RPC methods grow too large.
Create the file
peek_plugin_tutorial/_private/agent/AgentToServerRpcCallExample.py
and populate it with the following contents.
import logging
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent import RpcForAgent
from peek_plugin_tutorial._private.storage.StringIntTuple import StringIntTuple
logger = logging.getLogger(__name__)
class AgentToServerRpcCallExample:
def start(self):
# kickoff the example
# Tell the reactor to start it in 5 seconds, we shouldn't do things like
# this in the plugins start method.
reactor.callLater(5, self.runWithInlineCallback)
# Return self, to make it simpler for the AgentEntryHook
return self
@inlineCallbacks
def runWithInlineCallback(self):
""" Run With Inline Callbacks
To understand what the :code:`@inlineCallbacks` decorator does, you can read
more in the twisted documentation.
This is the simplest way to go with asynchronous code.
Yield here, will cause the flow of code to return to the twisted.reactor
until the deferreds callback or errback is called.
The errback will cause an exception, which we'd catch with a standard
try/except block.
"""
# The :code:`@vortexRPC` decorator wraps the :code:`RpcForAgent.updateStatus`
# method with an instance of the :code:`_VortexRPC` class,
# this class has a :code:`__call__` method implemented, that is what we're
# calling here.
#
# So although it looks like we're trying to call a class method, that's not what's
# happening.
yield RpcForAgent.updateStatus("Agent RPC Example Started")
seedInt = 5
logger.debug("seedInt = %s", seedInt)
for _ in range(5):
seedInt = yield RpcForAgent.addInts(seedInt, kwval1=7)
logger.debug("seedInt = %s", seedInt)
# Move onto the run method.
# We don't use yield here, so :code:`runWithInlineCallback` will continue on and
# finish
self.run()
logger.debug("runWithInlineCallback finished")
def run(self):
""" Run
In this method, we call some RPCs and handle the deferreds.
We won't be using @inlineCallbacks here. We will setup all the calls and
callbacks, then the run method will return. The calls and callbacks will happen
long after this method finishes.
"""
stringInt = StringIntTuple(int1=50, string1="Created from Agent RPC")
d = RpcForAgent.addStringInt(stringInt)
# the deferred will call the lambda function,
# "_" will be the result of "addStringInt, which we ignore
# the lambda function calls RpcForAgent.updateStatus,
# which will return a deferred
#
# Returning a deferred from a callback is fine, it's just merilly processed
d.addCallback(lambda _: RpcForAgent.updateStatus("Agent RPC Example Completed"))
# Unless you have a good reason, always return the last deferred.
return d
def shutdown(self):
pass
Edit File AgentEntryHook.py
¶
We need to update AgentEntryHook.py
, to initialise the
AgentToServerRpcCallExample.
Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py
:
Add this import at the top of the file with the other imports:
from .AgentToServerRpcCallExample import AgentToServerRpcCallExample
Add this line just before the
logger.debug("Started")
line at the end of thestart()
method:# Initialise and start the AgentToServerRpcCallExample self._loadedObjects.append(AgentToServerRpcCallExample().start())
The agent will now call the server RPC methods.
Agent RPC Setup¶
In this section we setup the files required to define an RPC on the agent that the server will call.
Some example use cases would be: * Agent to query data from external DB * Agent to connect to remote server via SSH and pull back some data * Agent to push an update to a corporate system via HTTP

Add File RpcForServer.py
¶
File RpcForServer.py
defines the methods the server will call via RPC.
Create the file
peek_plugin_tutorial/_private/agent/RpcForServer.py
and populate it with the following contents.
import logging
from peek_plugin_base.PeekVortexUtil import peekAgentName
from peek_plugin_tutorial._private.PluginNames import tutorialFilt
from vortex.rpc.RPC import vortexRPC
logger = logging.getLogger(__name__)
class RpcForServer:
def __init__(self):
pass
def makeHandlers(self):
""" Make Handlers
In this method we start all the RPC handlers
start() returns an instance of itself so we can simply yield the result
of the start method.
"""
yield self.subInts.start(funcSelf=self)
logger.debug("Server RPCs started")
# -------------
@vortexRPC(peekAgentName, additionalFilt=tutorialFilt)
def subInts(self, val1, kwval1=9):
""" Add Ints
This is the simplest RPC example possible.
:param val1: A value to start with
:param kwval1: The value to subtract
:return: One value minus the other
"""
return val1 - kwval1
Edit File AgentEntryHook.py
¶
We need to update AgentEntryHook.py
, to initialise the RpcForServer.
Edit the file peek_plugin_tutorial/_private/agent/AgentEntryHook.py
:
Add this import at the top of the file with the other imports:
from .RpcForServer import RpcForServer
Add this line just before the
logger.debug("Started")
line at the end of thestart()
method:# Initialise and start the RPC for Server self._loadedObjects.extend(RpcForServer().makeHandlers())
The sever side RPC is now setup.
Server Calling Agent RPC¶
This section implements the code in the server that will call the RPC methods that the agent has defined.
Add File ServerToAgentRpcCallExample.py
¶
File ServerToAgentRpcCallExample.py
defines the methods the server
will call via RPC.
Create the file
peek_plugin_tutorial/_private/server/ServerToAgentRpcCallExample.py
and populate it with the following contents.
import logging
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from peek_plugin_tutorial._private.agent.RpcForServer import RpcForServer
logger = logging.getLogger(__name__)
class ServerToAgentRpcCallExample:
def start(self):
# kickoff the example
# Tell the reactor to start it in 20 seconds, we shouldn't do things like
# this in the plugins start method.
reactor.callLater(20, self.run)
return self
@inlineCallbacks
def run(self):
# Call the agents RPC method
result = yield RpcForServer.subInts(7, kwval1=5)
logger.debug("seedInt result = %s (Should be 2)", result)
def shutdown(self):
pass
Edit File ServerEntryHook.py
¶
We need to update ServerEntryHook.py
, to initialise the
ServerToAgentRpcCallExample.
Edit the file peek_plugin_tutorial/_private/server/ServerEntryHook.py
:
Add this import at the top of the file with the other imports:
from .ServerToAgentRpcCallExample import ServerToAgentRpcCallExample
Add this line just before the
logger.debug("Started")
line at the end of thestart()
method:# Initialise and start the RPC for Server self._loadedObjects.append(ServerToAgentRpcCallExample().start())
The server will now call the RPC method on the agent when it starts.
Testing¶
- Open a command window and run:
run_peek_server
- Open a command window and run:
run_peek_agent
- Examine the logs of both command windows
run_peek_server
log example:
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.server.controller.MainController:Agent said : Agent RPC Example Started
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC call for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.server.controller.MainController:Agent said : Agent RPC Example Completed
run_peek_agent
log example:
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 5
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 12
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 19
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 26
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 33
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addInts
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:seedInt = 40
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG peek_plugin_tutorial._private.agent.AgentToServerRpcCallExample:runWithInlineCallback finished
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.addStringInt
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Calling RPC for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus
19-Apr-2017 09:24:42 DEBUG vortex.rpc.RPC:Received RPC result for peek_plugin_tutorial._private.server.agent_handlers.RpcForAgent.RpcForAgent.updateStatus