Source code for peek_plugin_base.simple_subproc.simple_subproc_parent_protocol

import json
import logging
from base64 import b64decode
from base64 import b64encode

from twisted.internet import protocol
from twisted.internet.defer import Deferred
from twisted.internet.defer import inlineCallbacks
from vortex.DeferUtil import deferToThreadWrapWithLogger

from .simple_subproc_task_call_tuple import SimpleSubprocTaskCallTuple
from .simple_subproc_task_constructor_tuple import (
    SimpleSubprocTaskConstructorTuple,
)
from .simple_subproc_task_result_tuple import SimpleSubprocTaskResultTuple

logger = logging.getLogger("simple_subproc_parent_protocol")


[docs]class SimpleSubprocParentProtocol(protocol.ProcessProtocol): def __init__(self, constructorTuple: SimpleSubprocTaskConstructorTuple): self._constructorTuple = constructorTuple self._data = b"" self._logData = b"" self._deferredByUuid = {}
[docs] def connectionMade(self): self.transport.write( b64encode(json.dumps(self._constructorTuple.toJsonDict()).encode()) ) self.transport.write(b".")
[docs] def errReceived(self, data: bytes): self._logData += data while b"\n" in self._logData: message, self._logData = self._logData.split(b"\n", 1) if not message: continue message = message.decode() if ":" not in message: logger.error(message) else: severity, logMsg = message.split(":", 1) if severity == "DEBUG": logger.debug(logMsg) elif severity == "INFO": logger.info(logMsg) elif severity == "WARNING": logger.warning(logMsg) elif severity == "ERROR": logger.error(logMsg) else: logger.error(message)
[docs] @inlineCallbacks def outReceived(self, data): self._data += data while b"." in self._data: message, self._data = self._data.split(b".", 1) if not message: continue resultTuple = yield self._decodeResult(message) d = self._deferredByUuid.pop(resultTuple.commandUuid) if resultTuple.exceptionStr: d.errback(Exception(resultTuple.exceptionStr)) else: d.callback(resultTuple.result)
[docs] @inlineCallbacks def queueCommand(self, command: SimpleSubprocTaskCallTuple) -> Deferred: d = Deferred() self._deferredByUuid[command.commandUuid] = d commandMessage = yield self._encodeCall(command) self.transport.write(commandMessage) self.transport.write(b".") return (yield d)
@deferToThreadWrapWithLogger(logger) def _encodeCall(self, command): return b64encode(json.dumps(command.toJsonDict()).encode()) @deferToThreadWrapWithLogger(logger) def _decodeResult(self, message): return SimpleSubprocTaskResultTuple().fromJsonDict( json.loads(b64decode(message)) )