Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / SQLAlchemy   python

Repository URL to install this package:

Version: 1.2.10 

/ testing / suite / test_reflection.py



import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import types as sql_types
from sqlalchemy import inspect
from sqlalchemy import MetaData, Integer, String, func
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import engines, fixtures
from sqlalchemy.testing.schema import Table, Column
from sqlalchemy.testing import eq_, is_, assert_raises_message
from sqlalchemy import testing
from .. import config
import operator
from sqlalchemy.schema import DDL, Index
from sqlalchemy import event
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy import ForeignKey
import re

metadata, users = None, None


class HasTableTest(fixtures.TablesTest):
    __backend__ = True

    @classmethod
    def define_tables(cls, metadata):
        Table('test_table', metadata,
              Column('id', Integer, primary_key=True),
              Column('data', String(50))
              )

    def test_has_table(self):
        with config.db.begin() as conn:
            assert config.db.dialect.has_table(conn, "test_table")
            assert not config.db.dialect.has_table(conn, "nonexistent_table")


class ComponentReflectionTest(fixtures.TablesTest):
    run_inserts = run_deletes = None

    __backend__ = True

    @classmethod
    def setup_bind(cls):
        if config.requirements.independent_connections.enabled:
            from sqlalchemy import pool
            return engines.testing_engine(
                options=dict(poolclass=pool.StaticPool))
        else:
            return config.db

    @classmethod
    def define_tables(cls, metadata):
        cls.define_reflected_tables(metadata, None)
        if testing.requires.schemas.enabled:
            cls.define_reflected_tables(metadata, testing.config.test_schema)

    @classmethod
    def define_reflected_tables(cls, metadata, schema):
        if schema:
            schema_prefix = schema + "."
        else:
            schema_prefix = ""

        if testing.requires.self_referential_foreign_keys.enabled:
            users = Table('users', metadata,
                          Column('user_id', sa.INT, primary_key=True),
                          Column('test1', sa.CHAR(5), nullable=False),
                          Column('test2', sa.Float(5), nullable=False),
                          Column('parent_user_id', sa.Integer,
                                 sa.ForeignKey('%susers.user_id' %
                                               schema_prefix,
                                               name='user_id_fk')),
                          schema=schema,
                          test_needs_fk=True,
                          )
        else:
            users = Table('users', metadata,
                          Column('user_id', sa.INT, primary_key=True),
                          Column('test1', sa.CHAR(5), nullable=False),
                          Column('test2', sa.Float(5), nullable=False),
                          schema=schema,
                          test_needs_fk=True,
                          )

        Table("dingalings", metadata,
              Column('dingaling_id', sa.Integer, primary_key=True),
              Column('address_id', sa.Integer,
                     sa.ForeignKey('%semail_addresses.address_id' %
                                   schema_prefix)),
              Column('data', sa.String(30)),
              schema=schema,
              test_needs_fk=True,
              )
        Table('email_addresses', metadata,
              Column('address_id', sa.Integer),
              Column('remote_user_id', sa.Integer,
                     sa.ForeignKey(users.c.user_id)),
              Column('email_address', sa.String(20)),
              sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
              schema=schema,
              test_needs_fk=True,
              )
        Table('comment_test', metadata,
              Column('id', sa.Integer, primary_key=True, comment='id comment'),
              Column('data', sa.String(20), comment='data % comment'),
              Column(
                  'd2', sa.String(20),
                  comment=r"""Comment types type speedily ' " \ '' Fun!"""),
              schema=schema,
              comment=r"""the test % ' " \ table comment""")

        if testing.requires.cross_schema_fk_reflection.enabled:
            if schema is None:
                Table(
                    'local_table', metadata,
                    Column('id', sa.Integer, primary_key=True),
                    Column('data', sa.String(20)),
                    Column(
                        'remote_id',
                        ForeignKey(
                            '%s.remote_table_2.id' %
                            testing.config.test_schema)
                    ),
                    test_needs_fk=True,
                    schema=config.db.dialect.default_schema_name
                )
            else:
                Table(
                    'remote_table', metadata,
                    Column('id', sa.Integer, primary_key=True),
                    Column(
                        'local_id',
                        ForeignKey(
                            '%s.local_table.id' %
                            config.db.dialect.default_schema_name)
                    ),
                    Column('data', sa.String(20)),
                    schema=schema,
                    test_needs_fk=True,
                )
                Table(
                    'remote_table_2', metadata,
                    Column('id', sa.Integer, primary_key=True),
                    Column('data', sa.String(20)),
                    schema=schema,
                    test_needs_fk=True,
                )

        if testing.requires.index_reflection.enabled:
            cls.define_index(metadata, users)

            if not schema:
                # test_needs_fk is at the moment to force MySQL InnoDB
                noncol_idx_test_nopk = Table(
                    'noncol_idx_test_nopk', metadata,
                    Column('q', sa.String(5)),
                    test_needs_fk=True,
                )

                noncol_idx_test_pk = Table(
                    'noncol_idx_test_pk', metadata,
                    Column('id', sa.Integer, primary_key=True),
                    Column('q', sa.String(5)),
                    test_needs_fk=True,
                )
                Index('noncol_idx_nopk', noncol_idx_test_nopk.c.q.desc())
                Index('noncol_idx_pk', noncol_idx_test_pk.c.q.desc())

        if testing.requires.view_column_reflection.enabled:
            cls.define_views(metadata, schema)
        if not schema and testing.requires.temp_table_reflection.enabled:
            cls.define_temp_tables(metadata)

    @classmethod
    def define_temp_tables(cls, metadata):
        # cheat a bit, we should fix this with some dialect-level
        # temp table fixture
        if testing.against("oracle"):
            kw = {
                'prefixes': ["GLOBAL TEMPORARY"],
                'oracle_on_commit': 'PRESERVE ROWS'
            }
        else:
            kw = {
                'prefixes': ["TEMPORARY"],
            }

        user_tmp = Table(
            "user_tmp", metadata,
            Column("id", sa.INT, primary_key=True),
            Column('name', sa.VARCHAR(50)),
            Column('foo', sa.INT),
            sa.UniqueConstraint('name', name='user_tmp_uq'),
            sa.Index("user_tmp_ix", "foo"),
            **kw
        )
        if testing.requires.view_reflection.enabled and \
                testing.requires.temporary_views.enabled:
            event.listen(
                user_tmp, "after_create",
                DDL("create temporary view user_tmp_v as "
                    "select * from user_tmp")
            )
            event.listen(
                user_tmp, "before_drop",
                DDL("drop view user_tmp_v")
            )

    @classmethod
    def define_index(cls, metadata, users):
        Index("users_t_idx", users.c.test1, users.c.test2)
        Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)

    @classmethod
    def define_views(cls, metadata, schema):
        for table_name in ('users', 'email_addresses'):
            fullname = table_name
            if schema:
                fullname = "%s.%s" % (schema, table_name)
            view_name = fullname + '_v'
            query = "CREATE VIEW %s AS SELECT * FROM %s" % (
                view_name, fullname)

            event.listen(
                metadata,
                "after_create",
                DDL(query)
            )
            event.listen(
                metadata,
                "before_drop",
                DDL("DROP VIEW %s" % view_name)
            )

    @testing.requires.schema_reflection
    def test_get_schema_names(self):
        insp = inspect(testing.db)

        self.assert_(testing.config.test_schema in insp.get_schema_names())

    @testing.requires.schema_reflection
    def test_dialect_initialize(self):
        engine = engines.testing_engine()
        assert not hasattr(engine.dialect, 'default_schema_name')
        inspect(engine)
        assert hasattr(engine.dialect, 'default_schema_name')

    @testing.requires.schema_reflection
    def test_get_default_schema_name(self):
        insp = inspect(testing.db)
        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)

    @testing.provide_metadata
    def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        _ignore_tables = [
            'comment_test', 'noncol_idx_test_pk', 'noncol_idx_test_nopk',
            'local_table', 'remote_table', 'remote_table_2'
        ]
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = [
                t for t in insp.get_table_names(
                    schema,
                    order_by=order_by) if t not in _ignore_tables]

            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)

    @testing.requires.temp_table_names
    def test_get_temp_table_names(self):
        insp = inspect(self.bind)
        temp_table_names = insp.get_temp_table_names()
        eq_(sorted(temp_table_names), ['user_tmp'])

    @testing.requires.view_reflection
    @testing.requires.temp_table_names
    @testing.requires.temporary_views
    def test_get_temp_view_names(self):
        insp = inspect(self.bind)
        temp_table_names = insp.get_temp_view_names()
        eq_(sorted(temp_table_names), ['user_tmp_v'])

    @testing.requires.table_reflection
    def test_get_table_names(self):
        self._test_get_table_names()

    @testing.requires.table_reflection
    @testing.requires.foreign_key_constraint_reflection
    def test_get_table_names_fks(self):
        self._test_get_table_names(order_by='foreign_key')

    @testing.requires.comment_reflection
    def test_get_comments(self):
        self._test_get_comments()

    @testing.requires.comment_reflection
    @testing.requires.schemas
    def test_get_comments_with_schema(self):
        self._test_get_comments(testing.config.test_schema)

    def _test_get_comments(self, schema=None):
        insp = inspect(testing.db)

        eq_(
            insp.get_table_comment("comment_test", schema=schema),
            {"text": r"""the test % ' " \ table comment"""}
        )

        eq_(
            insp.get_table_comment("users", schema=schema),
            {"text": None}
        )

        eq_(
            [
                {"name": rec['name'], "comment": rec['comment']}
                for rec in
                insp.get_columns("comment_test", schema=schema)
            ],
            [
                {'comment': 'id comment', 'name': 'id'},
                {'comment': 'data % comment', 'name': 'data'},
                {'comment': r"""Comment types type speedily ' " \ '' Fun!""",
                 'name': 'd2'}
            ]
        )

    @testing.requires.table_reflection
Loading ...