import json
import logging
from base64 import b64decode
from base64 import b64encode
from twisted.internet import protocol
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.protocol import connectionDone
from twisted.python import failure
from peek_plugin_base.simple_subproc.simple_subproc_task_call_tuple import (
SimpleSubprocTaskCallTuple,
)
from peek_plugin_base.simple_subproc.simple_subproc_task_constructor_tuple import (
SimpleSubprocTaskConstructorTuple,
)
from peek_plugin_base.simple_subproc.simple_subproc_task_result_tuple import (
SimpleSubprocTaskResultTuple,
)
logger = logging.getLogger("simple_subproc_child_protocol")
[docs]class SimpleSubprocChildProtocol(protocol.Protocol):
def __init__(self, TaskClass):
self._data = b""
self._delegate = None
self._Delegate = TaskClass
[docs] @inlineCallbacks
def dataReceived(self, data: bytes):
self._data += data
while b"." in self._data:
message, self._data = self._data.split(b".", 1)
if not message:
continue
if not self._delegate:
logger.debug("Received constructor data")
self._constructClass(message)
continue
yield self._runTask(message)
[docs] def connectionLost(self, reason: failure.Failure = connectionDone):
logger.debug("STDIN closed, we'll stop the reactor in 5 seconds")
reactor.callLater(5, reactor.stop)
def _constructClass(self, message: bytes):
constructorTuple = SimpleSubprocTaskConstructorTuple().fromJsonDict(
json.loads(b64decode(message))
)
self._delegate = self._Delegate(**constructorTuple.kwargs)
@inlineCallbacks
def _runTask(self, message: bytes):
# logging.info("Received load data")
importCommandTuple = SimpleSubprocTaskCallTuple().fromJsonDict(
json.loads(b64decode(message))
)
try:
result = yield self._delegate.run(**importCommandTuple.kwargs)
resultTuple = SimpleSubprocTaskResultTuple(
commandUuid=importCommandTuple.commandUuid, result=result
)
except Exception as e:
logger.exception(str(e))
resultTuple = SimpleSubprocTaskResultTuple(
commandUuid=importCommandTuple.commandUuid, exceptionStr=str(e)
)
response = b64encode(json.dumps(resultTuple.toJsonDict()).encode())
self.transport.write(response)
self.transport.write(b".")