import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
from sqlalchemy import inspect
from sqlalchemy import MetaData, Integer, String, func
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import engines, fixtures
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.testing import eq_, is_, assert_raises_message
from sqlalchemy import testing
from .. import config
import operator
from sqlalchemy.schema import DDL, Index
from sqlalchemy import event
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import ForeignKey
import re
metadata, users = None, None
class HasTableTest(fixtures.TablesTest):
__backend__ = True
@classmethod
def define_tables(cls, metadata):
Table('test_table', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(50))
)
def test_has_table(self):
with config.db.begin() as conn:
assert config.db.dialect.has_table(conn, "test_table")
assert not config.db.dialect.has_table(conn, "nonexistent_table")
class ComponentReflectionTest(fixtures.TablesTest):
run_inserts = run_deletes = None
__backend__ = True
@classmethod
def setup_bind(cls):
if config.requirements.independent_connections.enabled:
from sqlalchemy import pool
return engines.testing_engine(
options=dict(poolclass=pool.StaticPool))
else:
return config.db
@classmethod
def define_tables(cls, metadata):
cls.define_reflected_tables(metadata, None)
if testing.requires.schemas.enabled:
cls.define_reflected_tables(metadata, testing.config.test_schema)
@classmethod
def define_reflected_tables(cls, metadata, schema):
if schema:
schema_prefix = schema + "."
else:
schema_prefix = ""
if testing.requires.self_referential_foreign_keys.enabled:
users = Table('users', metadata,
Column('user_id', sa.INT, primary_key=True),
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('%susers.user_id' %
schema_prefix,
name='user_id_fk')),
schema=schema,
test_needs_fk=True,
)
else:
users = Table('users', metadata,
Column('user_id', sa.INT, primary_key=True),
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
schema=schema,
test_needs_fk=True,
)
Table("dingalings", metadata,
Column('dingaling_id', sa.Integer, primary_key=True),
Column('address_id', sa.Integer,
sa.ForeignKey('%semail_addresses.address_id' %
schema_prefix)),
Column('data', sa.String(30)),
schema=schema,
test_needs_fk=True,
)
Table('email_addresses', metadata,
Column('address_id', sa.Integer),
Column('remote_user_id', sa.Integer,
sa.ForeignKey(users.c.user_id)),
Column('email_address', sa.String(20)),
sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
schema=schema,
test_needs_fk=True,
)
Table('comment_test', metadata,
Column('id', sa.Integer, primary_key=True, comment='id comment'),
Column('data', sa.String(20), comment='data % comment'),
Column(
'd2', sa.String(20),
comment=r"""Comment types type speedily ' " \ '' Fun!"""),
schema=schema,
comment=r"""the test % ' " \ table comment""")
if testing.requires.cross_schema_fk_reflection.enabled:
if schema is None:
Table(
'local_table', metadata,
Column('id', sa.Integer, primary_key=True),
Column('data', sa.String(20)),
Column(
'remote_id',
ForeignKey(
'%s.remote_table_2.id' %
testing.config.test_schema)
),
test_needs_fk=True,
schema=config.db.dialect.default_schema_name
)
else:
Table(
'remote_table', metadata,
Column('id', sa.Integer, primary_key=True),
Column(
'local_id',
ForeignKey(
'%s.local_table.id' %
config.db.dialect.default_schema_name)
),
Column('data', sa.String(20)),
schema=schema,
test_needs_fk=True,
)
Table(
'remote_table_2', metadata,
Column('id', sa.Integer, primary_key=True),
Column('data', sa.String(20)),
schema=schema,
test_needs_fk=True,
)
if testing.requires.index_reflection.enabled:
cls.define_index(metadata, users)
if not schema:
# test_needs_fk is at the moment to force MySQL InnoDB
noncol_idx_test_nopk = Table(
'noncol_idx_test_nopk', metadata,
Column('q', sa.String(5)),
test_needs_fk=True,
)
noncol_idx_test_pk = Table(
'noncol_idx_test_pk', metadata,
Column('id', sa.Integer, primary_key=True),
Column('q', sa.String(5)),
test_needs_fk=True,
)
Index('noncol_idx_nopk', noncol_idx_test_nopk.c.q.desc())
Index('noncol_idx_pk', noncol_idx_test_pk.c.q.desc())
if testing.requires.view_column_reflection.enabled:
cls.define_views(metadata, schema)
if not schema and testing.requires.temp_table_reflection.enabled:
cls.define_temp_tables(metadata)
@classmethod
def define_temp_tables(cls, metadata):
# cheat a bit, we should fix this with some dialect-level
# temp table fixture
if testing.against("oracle"):
kw = {
'prefixes': ["GLOBAL TEMPORARY"],
'oracle_on_commit': 'PRESERVE ROWS'
}
else:
kw = {
'prefixes': ["TEMPORARY"],
}
user_tmp = Table(
"user_tmp", metadata,
Column("id", sa.INT, primary_key=True),
Column('name', sa.VARCHAR(50)),
Column('foo', sa.INT),
sa.UniqueConstraint('name', name='user_tmp_uq'),
sa.Index("user_tmp_ix", "foo"),
**kw
)
if testing.requires.view_reflection.enabled and \
testing.requires.temporary_views.enabled:
event.listen(
user_tmp, "after_create",
DDL("create temporary view user_tmp_v as "
"select * from user_tmp")
)
event.listen(
user_tmp, "before_drop",
DDL("drop view user_tmp_v")
)
@classmethod
def define_index(cls, metadata, users):
Index("users_t_idx", users.c.test1, users.c.test2)
Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
@classmethod
def define_views(cls, metadata, schema):
for table_name in ('users', 'email_addresses'):
fullname = table_name
if schema:
fullname = "%s.%s" % (schema, table_name)
view_name = fullname + '_v'
query = "CREATE VIEW %s AS SELECT * FROM %s" % (
view_name, fullname)
event.listen(
metadata,
"after_create",
DDL(query)
)
event.listen(
metadata,
"before_drop",
DDL("DROP VIEW %s" % view_name)
)
@testing.requires.schema_reflection
def test_get_schema_names(self):
insp = inspect(testing.db)
self.assert_(testing.config.test_schema in insp.get_schema_names())
@testing.requires.schema_reflection
def test_dialect_initialize(self):
engine = engines.testing_engine()
assert not hasattr(engine.dialect, 'default_schema_name')
inspect(engine)
assert hasattr(engine.dialect, 'default_schema_name')
@testing.requires.schema_reflection
def test_get_default_schema_name(self):
insp = inspect(testing.db)
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
@testing.provide_metadata
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
_ignore_tables = [
'comment_test', 'noncol_idx_test_pk', 'noncol_idx_test_nopk',
'local_table', 'remote_table', 'remote_table_2'
]
meta = self.metadata
users, addresses, dingalings = self.tables.users, \
self.tables.email_addresses, self.tables.dingalings
insp = inspect(meta.bind)
if table_type == 'view':
table_names = insp.get_view_names(schema)
table_names.sort()
answer = ['email_addresses_v', 'users_v']
eq_(sorted(table_names), answer)
else:
table_names = [
t for t in insp.get_table_names(
schema,
order_by=order_by) if t not in _ignore_tables]
if order_by == 'foreign_key':
answer = ['users', 'email_addresses', 'dingalings']
eq_(table_names, answer)
else:
answer = ['dingalings', 'email_addresses', 'users']
eq_(sorted(table_names), answer)
@testing.requires.temp_table_names
def test_get_temp_table_names(self):
insp = inspect(self.bind)
temp_table_names = insp.get_temp_table_names()
eq_(sorted(temp_table_names), ['user_tmp'])
@testing.requires.view_reflection
@testing.requires.temp_table_names
@testing.requires.temporary_views
def test_get_temp_view_names(self):
insp = inspect(self.bind)
temp_table_names = insp.get_temp_view_names()
eq_(sorted(temp_table_names), ['user_tmp_v'])
@testing.requires.table_reflection
def test_get_table_names(self):
self._test_get_table_names()
@testing.requires.table_reflection
@testing.requires.foreign_key_constraint_reflection
def test_get_table_names_fks(self):
self._test_get_table_names(order_by='foreign_key')
@testing.requires.comment_reflection
def test_get_comments(self):
self._test_get_comments()
@testing.requires.comment_reflection
@testing.requires.schemas
def test_get_comments_with_schema(self):
self._test_get_comments(testing.config.test_schema)
def _test_get_comments(self, schema=None):
insp = inspect(testing.db)
eq_(
insp.get_table_comment("comment_test", schema=schema),
{"text": r"""the test % ' " \ table comment"""}
)
eq_(
insp.get_table_comment("users", schema=schema),
{"text": None}
)
eq_(
[
{"name": rec['name'], "comment": rec['comment']}
for rec in
insp.get_columns("comment_test", schema=schema)
],
[
{'comment': 'id comment', 'name': 'id'},
{'comment': 'data % comment', 'name': 'data'},
{'comment': r"""Comment types type speedily ' " \ '' Fun!""",
'name': 'd2'}
]
)
@testing.requires.table_reflection
Loading ...