(P) storage

(M) AlembicEnvBase

class peek_plugin_base.storage.AlembicEnvBase.AlembicEnvBase(targetMetadata)[source]

Bases: object


Run migrations in ‘online’ mode. In this scenario we need to create an Engine and associate a connection with the context.

peek_plugin_base.storage.AlembicEnvBase.ensureSchemaExists(engine, schemaName)[source]

(M) DbConnection

class peek_plugin_base.storage.DbConnection.DbConnection(dbConnectString: str, metadata: sqlalchemy.sql.schema.MetaData, alembicDir: str, dbEngineArgs: Optional[Dict[str, Union[str, int]]] = None, enableForeignKeys=False, enableCreateAll=True)[source]

Bases: object

SQLAlchemy Database Connection

This class takes care of migrating the database and establishing thing database connections and ORM sessions.

  • dbConnectString – The connection string for the DB. See http://docs.sqlalchemy.org/en/latest/core/engines.html
  • metadata – The instance of the metadata for this connection, This is schema qualified MetaData(schema=”schama_name”)
  • alembicDir – The absolute location of the alembic directory (versions dir lives under this)
  • dbEngineArgs – The arguments to pass to the database engine, See http://docs.sqlalchemy.org/en/latest/core/engines.html#engine-creation-api
  • enableCreateAll – If the schema doesn’t exist, then the migration is allowed to use matadata.create_all()
  • enableForeignKeys – Perform a check to ensure foriegn keys have indexes after the db is migrated and connected.
checkForeignKeys(engine: sqlalchemy.engine.base.Engine) → None[source]

Check Foreign Keys

Log any foreign keys that don’t have indexes assigned to them. This is a performance issue.


Close All Session

Close all ORM sessions connected to this DB engine.


Get DB Engine

This is not thread safe, use the ormSesson to execute SQL statements instead. self.ormSession.execute(…)

Returns:the DB Engine used to connect to the database.
migrate() → None[source]


Perform a database migration, upgrading to the latest schema level.

peek_plugin_base.storage.DbConnection.convertToCoreSqlaInsert(ormObj, Declarative)[source]
peek_plugin_base.storage.DbConnection.pgCopyInsert(rawConn, table, inserts)[source]

(M) LoadPayloadPgUtil

class peek_plugin_base.storage.LoadPayloadPgUtil.LoadPayloadTupleResult(count, encodedPayload)

Bases: tuple

Create new instance of LoadPayloadTupleResult(count, encodedPayload)


Alias for field number 0


Alias for field number 1

peek_plugin_base.storage.LoadPayloadPgUtil.getTuplesPayload(logger: logging.Logger, dbSessionCreator: Callable[[], sqlalchemy.orm.session.Session], sql: sqlalchemy.sql.selectable.Select, sqlCoreLoadTupleClassmethod: Callable, payloadFilt: Optional[Dict] = None, fetchSize=50) → twisted.internet.defer.Deferred[source]
peek_plugin_base.storage.LoadPayloadPgUtil.getTuplesPayloadBlocking(dbSessionCreator: Callable[[], sqlalchemy.orm.session.Session], sql: sqlalchemy.sql.selectable.Select, sqlCoreLoadTupleClassmethod: Callable, payloadFilt: Optional[Dict] = None, fetchSize=50) → peek_plugin_base.storage.LoadPayloadPgUtil.LoadPayloadTupleResult[source]

(M) RunPyInPg

peek_plugin_base.storage.RunPyInPg.runPyInPg(logger: logging.Logger, dbSessionCreator: Callable[[], sqlalchemy.orm.session.Session], classMethodToRun: Callable, classMethodToImportTuples: Optional[Callable] = None, *args, **kwargs) → Any[source]
peek_plugin_base.storage.RunPyInPg.runPyInPgBlocking(dbSessionCreator: Callable[[], sqlalchemy.orm.session.Session], classMethodToRun: Any, classMethodToImportTuples: Optional[Callable] = None, *args, **kwargs) → Any[source]

(M) StorageUtil

peek_plugin_base.storage.StorageUtil.makeCoreValuesSubqueryCondition(engine, column, values: List[Union[int, str]])[source]

Make Core Values Subquery

  • engine – The database engine, used to determine the dialect
  • column – The column, eg TableItem.__table__.c.colName
  • values – A list of string or int values
peek_plugin_base.storage.StorageUtil.makeOrmValuesSubqueryCondition(ormSession, column, values: List[Union[int, str]])[source]

Make Orm Values Subquery

  • ormSession – The orm session instance
  • column – The column from the Declarative table, eg TableItem.colName
  • values – A list of string or int values

(M) TypeDecorators

class peek_plugin_base.storage.TypeDecorators.PeekLargeBinary(*args, **kwargs)[source]

Bases: sqlalchemy.sql.type_api.TypeDecorator

Peek Large Binary

Construct a TypeDecorator.

Arguments sent here are passed to the constructor of the class assigned to the impl class level attribute, assuming the impl is a callable, and the resulting object is assigned to the self.impl instance attribute (thus overriding the class attribute of the same name).

If the class level impl is not a callable (the unusual case), it will be assigned to the same instance attribute ‘as-is’, ignoring those arguments passed to the constructor.

Subclasses can override this to customize the generation of self.impl entirely.


Given a bind value (i.e. a BindParameter instance), return a SQL expression in its place.

This is typically a SQL function that wraps the existing bound parameter within the statement. It is used for special data types that require literals being wrapped in some special database function in order to coerce an application-level value into a database-specific format. It is the SQL analogue of the TypeEngine.bind_processor() method.

The method is evaluated at statement compile time, as opposed to statement construction time.

Note that this method, when implemented, should always return the exact same structure, without any conditional logic, as it may be used in an executemany() call against an arbitrary number of bound parameter sets.

See also



alias of sqlalchemy.sql.sqltypes.LargeBinary