import logging
from alembic import context
from sqlalchemy import engine_from_config, pool
from sqlalchemy.dialects.mssql.base import MSDialect
from sqlalchemy.dialects.postgresql.base import PGDialect
log = logging.getLogger(__name__)
[docs]def isMssqlDialect(engine):
return isinstance(engine.dialect, MSDialect)
[docs]def isPostGreSQLDialect(engine):
return isinstance(engine.dialect, PGDialect)
[docs]def ensureSchemaExists(engine, schemaName):
# Ensure the schema exists
if isinstance(engine.dialect, MSDialect):
if list(engine.execute("SELECT SCHEMA_ID('%s')" % schemaName))[0][0] is None:
engine.execute("CREATE SCHEMA [%s]" % schemaName)
elif isinstance(engine.dialect, PGDialect):
engine.execute('CREATE SCHEMA IF NOT EXISTS "%s" ' % schemaName)
else:
raise Exception("unknown dialect %s" % engine.dialect)
[docs]class AlembicEnvBase:
def __init__(self, targetMetadata):
from peek_platform.util.LogUtil import setupPeekLogger
setupPeekLogger()
self._config = context.config
self._targetMetadata = targetMetadata
self._schemaName = targetMetadata.schema
def _includeObjectFilter(self, object, name, type_, reflected, compare_to):
# If it's not in this schema, don't include it
if hasattr(object, "schema") and object.schema != self._schemaName:
return False
return True
[docs] def run(self):
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
self._config.get_section(self._config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
ensureSchemaExists(connectable, self._schemaName)
log.info("Migrating Peek schema %s" % self._schemaName)
schemaWiseConnection = connection.execution_options(
schema_translate_map={None: self._schemaName}
)
context.configure(
connection=schemaWiseConnection,
target_metadata=self._targetMetadata,
include_object=self._includeObjectFilter,
include_schemas=False,
# process_revision_directives=self._process_revision_directives,
version_table_schema=self._schemaName,
)
# schema-wise transactions
with context.begin_transaction():
context.run_migrations()