Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / io / test_sql.py

"""SQL io tests

The SQL tests are broken down in different classes:

- `PandasSQLTest`: base class with common methods for all test classes
- Tests for the public API (only tests with sqlite3)
    - `_TestSQLApi` base class
    - `TestSQLApi`: test the public API with sqlalchemy engine
    - `TestSQLiteFallbackApi`: test the public API with a sqlite DBAPI
      connection
- Tests for the different SQL flavors (flavor specific type conversions)
    - Tests for the sqlalchemy mode: `_TestSQLAlchemy` is the base class with
      common methods, `_TestSQLAlchemyConn` tests the API with a SQLAlchemy
      Connection object. The different tested flavors (sqlite3, MySQL,
      PostgreSQL) derive from the base class
    - Tests for the fallback mode (`TestSQLiteFallback`)

"""

import csv
from datetime import date, datetime, time
from io import StringIO
import sqlite3
import warnings

import numpy as np
import pytest

from pandas.compat import PY36

from pandas.core.dtypes.common import is_datetime64_dtype, is_datetime64tz_dtype

import pandas as pd
from pandas import (
    DataFrame,
    Index,
    MultiIndex,
    Series,
    Timestamp,
    concat,
    date_range,
    isna,
    to_datetime,
    to_timedelta,
)
import pandas.util.testing as tm

import pandas.io.sql as sql
from pandas.io.sql import read_sql_query, read_sql_table

try:
    import sqlalchemy
    import sqlalchemy.schema
    import sqlalchemy.sql.sqltypes as sqltypes
    from sqlalchemy.ext import declarative
    from sqlalchemy.orm import session as sa_session

    SQLALCHEMY_INSTALLED = True
except ImportError:
    SQLALCHEMY_INSTALLED = False

SQL_STRINGS = {
    "create_iris": {
        "sqlite": """CREATE TABLE iris (
                "SepalLength" REAL,
                "SepalWidth" REAL,
                "PetalLength" REAL,
                "PetalWidth" REAL,
                "Name" TEXT
            )""",
        "mysql": """CREATE TABLE iris (
                `SepalLength` DOUBLE,
                `SepalWidth` DOUBLE,
                `PetalLength` DOUBLE,
                `PetalWidth` DOUBLE,
                `Name` VARCHAR(200)
            )""",
        "postgresql": """CREATE TABLE iris (
                "SepalLength" DOUBLE PRECISION,
                "SepalWidth" DOUBLE PRECISION,
                "PetalLength" DOUBLE PRECISION,
                "PetalWidth" DOUBLE PRECISION,
                "Name" VARCHAR(200)
            )""",
    },
    "insert_iris": {
        "sqlite": """INSERT INTO iris VALUES(?, ?, ?, ?, ?)""",
        "mysql": """INSERT INTO iris VALUES(%s, %s, %s, %s, "%s");""",
        "postgresql": """INSERT INTO iris VALUES(%s, %s, %s, %s, %s);""",
    },
    "create_test_types": {
        "sqlite": """CREATE TABLE types_test_data (
                    "TextCol" TEXT,
                    "DateCol" TEXT,
                    "IntDateCol" INTEGER,
                    "IntDateOnlyCol" INTEGER,
                    "FloatCol" REAL,
                    "IntCol" INTEGER,
                    "BoolCol" INTEGER,
                    "IntColWithNull" INTEGER,
                    "BoolColWithNull" INTEGER
                )""",
        "mysql": """CREATE TABLE types_test_data (
                    `TextCol` TEXT,
                    `DateCol` DATETIME,
                    `IntDateCol` INTEGER,
                    `IntDateOnlyCol` INTEGER,
                    `FloatCol` DOUBLE,
                    `IntCol` INTEGER,
                    `BoolCol` BOOLEAN,
                    `IntColWithNull` INTEGER,
                    `BoolColWithNull` BOOLEAN
                )""",
        "postgresql": """CREATE TABLE types_test_data (
                    "TextCol" TEXT,
                    "DateCol" TIMESTAMP,
                    "DateColWithTz" TIMESTAMP WITH TIME ZONE,
                    "IntDateCol" INTEGER,
                    "IntDateOnlyCol" INTEGER,
                    "FloatCol" DOUBLE PRECISION,
                    "IntCol" INTEGER,
                    "BoolCol" BOOLEAN,
                    "IntColWithNull" INTEGER,
                    "BoolColWithNull" BOOLEAN
                )""",
    },
    "insert_test_types": {
        "sqlite": {
            "query": """
                INSERT INTO types_test_data
                VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
            "fields": (
                "TextCol",
                "DateCol",
                "IntDateCol",
                "IntDateOnlyCol",
                "FloatCol",
                "IntCol",
                "BoolCol",
                "IntColWithNull",
                "BoolColWithNull",
            ),
        },
        "mysql": {
            "query": """
                INSERT INTO types_test_data
                VALUES("%s", %s, %s, %s, %s, %s, %s, %s, %s)
                """,
            "fields": (
                "TextCol",
                "DateCol",
                "IntDateCol",
                "IntDateOnlyCol",
                "FloatCol",
                "IntCol",
                "BoolCol",
                "IntColWithNull",
                "BoolColWithNull",
            ),
        },
        "postgresql": {
            "query": """
                INSERT INTO types_test_data
                VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """,
            "fields": (
                "TextCol",
                "DateCol",
                "DateColWithTz",
                "IntDateCol",
                "IntDateOnlyCol",
                "FloatCol",
                "IntCol",
                "BoolCol",
                "IntColWithNull",
                "BoolColWithNull",
            ),
        },
    },
    "read_parameters": {
        "sqlite": "SELECT * FROM iris WHERE Name=? AND SepalLength=?",
        "mysql": 'SELECT * FROM iris WHERE `Name`="%s" AND `SepalLength`=%s',
        "postgresql": 'SELECT * FROM iris WHERE "Name"=%s AND "SepalLength"=%s',
    },
    "read_named_parameters": {
        "sqlite": """
                SELECT * FROM iris WHERE Name=:name AND SepalLength=:length
                """,
        "mysql": """
                SELECT * FROM iris WHERE
                `Name`="%(name)s" AND `SepalLength`=%(length)s
                """,
        "postgresql": """
                SELECT * FROM iris WHERE
                "Name"=%(name)s AND "SepalLength"=%(length)s
                """,
    },
    "create_view": {
        "sqlite": """
                CREATE VIEW iris_view AS
                SELECT * FROM iris
                """
    },
}


class MixInBase:
    def teardown_method(self, method):
        # if setup fails, there may not be a connection to close.
        if hasattr(self, "conn"):
            for tbl in self._get_all_tables():
                self.drop_table(tbl)
            self._close_conn()


class MySQLMixIn(MixInBase):
    def drop_table(self, table_name):
        cur = self.conn.cursor()
        cur.execute(
            "DROP TABLE IF EXISTS {}".format(sql._get_valid_mysql_name(table_name))
        )
        self.conn.commit()

    def _get_all_tables(self):
        cur = self.conn.cursor()
        cur.execute("SHOW TABLES")
        return [table[0] for table in cur.fetchall()]

    def _close_conn(self):
        from pymysql.err import Error

        try:
            self.conn.close()
        except Error:
            pass


class SQLiteMixIn(MixInBase):
    def drop_table(self, table_name):
        self.conn.execute(
            "DROP TABLE IF EXISTS {}".format(sql._get_valid_sqlite_name(table_name))
        )
        self.conn.commit()

    def _get_all_tables(self):
        c = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
        return [table[0] for table in c.fetchall()]

    def _close_conn(self):
        self.conn.close()


class SQLAlchemyMixIn(MixInBase):
    def drop_table(self, table_name):
        sql.SQLDatabase(self.conn).drop_table(table_name)

    def _get_all_tables(self):
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        table_list = meta.tables.keys()
        return table_list

    def _close_conn(self):
        pass


class PandasSQLTest:
    """
    Base class with common private methods for SQLAlchemy and fallback cases.

    """

    def _get_exec(self):
        if hasattr(self.conn, "execute"):
            return self.conn
        else:
            return self.conn.cursor()

    @pytest.fixture(params=[("io", "data", "iris.csv")])
    def load_iris_data(self, datapath, request):
        import io

        iris_csv_file = datapath(*request.param)

        if not hasattr(self, "conn"):
            self.setup_connect()

        self.drop_table("iris")
        self._get_exec().execute(SQL_STRINGS["create_iris"][self.flavor])

        with io.open(iris_csv_file, mode="r", newline=None) as iris_csv:
            r = csv.reader(iris_csv)
            next(r)  # skip header row
            ins = SQL_STRINGS["insert_iris"][self.flavor]

            for row in r:
                self._get_exec().execute(ins, row)

    def _load_iris_view(self):
        self.drop_table("iris_view")
        self._get_exec().execute(SQL_STRINGS["create_view"][self.flavor])

    def _check_iris_loaded_frame(self, iris_frame):
        pytype = iris_frame.dtypes[0].type
        row = iris_frame.iloc[0]

        assert issubclass(pytype, np.floating)
        tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"])

    def _load_test1_data(self):
        columns = ["index", "A", "B", "C", "D"]
        data = [
            (
                "2000-01-03 00:00:00",
                0.980268513777,
                3.68573087906,
                -0.364216805298,
                -1.15973806169,
            ),
            (
                "2000-01-04 00:00:00",
                1.04791624281,
                -0.0412318367011,
                -0.16181208307,
                0.212549316967,
            ),
            (
                "2000-01-05 00:00:00",
                0.498580885705,
                0.731167677815,
                -0.537677223318,
                1.34627041952,
            ),
            (
                "2000-01-06 00:00:00",
                1.12020151869,
                1.56762092543,
                0.00364077397681,
                0.67525259227,
            ),
        ]

        self.test_frame1 = DataFrame(data, columns=columns)

    def _load_test2_data(self):
        df = DataFrame(
            dict(
                A=[4, 1, 3, 6],
                B=["asd", "gsq", "ylt", "jkl"],
                C=[1.1, 3.1, 6.9, 5.3],
                D=[False, True, True, False],
                E=["1990-11-22", "1991-10-26", "1993-11-26", "1995-12-12"],
            )
        )
        df["E"] = to_datetime(df["E"])

        self.test_frame2 = df

    def _load_test3_data(self):
        columns = ["index", "A", "B"]
        data = [
            ("2000-01-03 00:00:00", 2 ** 31 - 1, -1.987670),
            ("2000-01-04 00:00:00", -29, -0.0412318367011),
            ("2000-01-05 00:00:00", 20000, 0.731167677815),
            ("2000-01-06 00:00:00", -290867, 1.56762092543),
        ]

        self.test_frame3 = DataFrame(data, columns=columns)

    def _load_raw_sql(self):
        self.drop_table("types_test_data")
        self._get_exec().execute(SQL_STRINGS["create_test_types"][self.flavor])
        ins = SQL_STRINGS["insert_test_types"][self.flavor]
        data = [
            {
                "TextCol": "first",
                "DateCol": "2000-01-03 00:00:00",
                "DateColWithTz": "2000-01-01 00:00:00-08:00",
                "IntDateCol": 535852800,
                "IntDateOnlyCol": 20101010,
                "FloatCol": 10.10,
                "IntCol": 1,
                "BoolCol": False,
                "IntColWithNull": 1,
                "BoolColWithNull": False,
            },
            {
                "TextCol": "first",
                "DateCol": "2000-01-04 00:00:00",
                "DateColWithTz": "2000-06-01 00:00:00-07:00",
                "IntDateCol": 1356998400,
                "IntDateOnlyCol": 20101212,
                "FloatCol": 10.10,
                "IntCol": 1,
                "BoolCol": False,
                "IntColWithNull": None,
                "BoolColWithNull": None,
            },
        ]

        for d in data:
            self._get_exec().execute(
                ins["query"], [d[field] for field in ins["fields"]]
            )

    def _count_rows(self, table_name):
        result = (
            self._get_exec()
            .execute(
                "SELECT count(*) AS count_1 FROM {table_name}".format(
                    table_name=table_name
                )
            )
            .fetchone()
        )
        return result[0]

    def _read_sql_iris(self):
        iris_frame = self.pandasSQL.read_query("SELECT * FROM iris")
        self._check_iris_loaded_frame(iris_frame)

    def _read_sql_iris_parameter(self):
        query = SQL_STRINGS["read_parameters"][self.flavor]
        params = ["Iris-setosa", 5.1]
        iris_frame = self.pandasSQL.read_query(query, params=params)
        self._check_iris_loaded_frame(iris_frame)

    def _read_sql_iris_named_parameter(self):
        query = SQL_STRINGS["read_named_parameters"][self.flavor]
        params = {"name": "Iris-setosa", "length": 5.1}
        iris_frame = self.pandasSQL.read_query(query, params=params)
        self._check_iris_loaded_frame(iris_frame)

    def _to_sql(self, method=None):
        self.drop_table("test_frame1")

        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", method=method)
        assert self.pandasSQL.has_table("test_frame1")

        num_entries = len(self.test_frame1)
        num_rows = self._count_rows("test_frame1")
        assert num_rows == num_entries

        # Nuke table
        self.drop_table("test_frame1")

    def _to_sql_empty(self):
        self.drop_table("test_frame1")
        self.pandasSQL.to_sql(self.test_frame1.iloc[:0], "test_frame1")

    def _to_sql_fail(self):
        self.drop_table("test_frame1")

        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="fail")
        assert self.pandasSQL.has_table("test_frame1")

        msg = "Table 'test_frame1' already exists"
        with pytest.raises(ValueError, match=msg):
            self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="fail")

        self.drop_table("test_frame1")

    def _to_sql_replace(self):
        self.drop_table("test_frame1")

        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="fail")
        # Add to table again
        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="replace")
        assert self.pandasSQL.has_table("test_frame1")

        num_entries = len(self.test_frame1)
        num_rows = self._count_rows("test_frame1")

        assert num_rows == num_entries
        self.drop_table("test_frame1")

    def _to_sql_append(self):
        # Nuke table just in case
        self.drop_table("test_frame1")

        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="fail")

        # Add to table again
        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", if_exists="append")
        assert self.pandasSQL.has_table("test_frame1")

        num_entries = 2 * len(self.test_frame1)
        num_rows = self._count_rows("test_frame1")

        assert num_rows == num_entries
        self.drop_table("test_frame1")

    def _to_sql_method_callable(self):
        check = []  # used to double check function below is really being used

        def sample(pd_table, conn, keys, data_iter):
            check.append(1)
            data = [dict(zip(keys, row)) for row in data_iter]
            conn.execute(pd_table.table.insert(), data)

        self.drop_table("test_frame1")

        self.pandasSQL.to_sql(self.test_frame1, "test_frame1", method=sample)
        assert self.pandasSQL.has_table("test_frame1")

        assert check == [1]
        num_entries = len(self.test_frame1)
        num_rows = self._count_rows("test_frame1")
        assert num_rows == num_entries
        # Nuke table
        self.drop_table("test_frame1")

    def _roundtrip(self):
        self.drop_table("test_frame_roundtrip")
        self.pandasSQL.to_sql(self.test_frame1, "test_frame_roundtrip")
        result = self.pandasSQL.read_query("SELECT * FROM test_frame_roundtrip")

        result.set_index("level_0", inplace=True)
        # result.index.astype(int)

        result.index.name = None

        tm.assert_frame_equal(result, self.test_frame1)

    def _execute_sql(self):
        # drop_sql = "DROP TABLE IF EXISTS test"  # should already be done
        iris_results = self.pandasSQL.execute("SELECT * FROM iris")
        row = iris_results.fetchone()
        tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"])

    def _to_sql_save_index(self):
        df = DataFrame.from_records(
            [(1, 2.1, "line1"), (2, 1.5, "line2")], columns=["A", "B", "C"], index=["A"]
        )
        self.pandasSQL.to_sql(df, "test_to_sql_saves_index")
        ix_cols = self._get_index_columns("test_to_sql_saves_index")
        assert ix_cols == [["A"]]

    def _transaction_test(self):
        self.pandasSQL.execute("CREATE TABLE test_trans (A INT, B TEXT)")

        ins_sql = "INSERT INTO test_trans (A,B) VALUES (1, 'blah')"

        # Make sure when transaction is rolled back, no rows get inserted
        try:
            with self.pandasSQL.run_transaction() as trans:
                trans.execute(ins_sql)
                raise Exception("error")
        except Exception:
            # ignore raised exception
            pass
        res = self.pandasSQL.read_query("SELECT * FROM test_trans")
        assert len(res) == 0

        # Make sure when transaction is committed, rows do get inserted
        with self.pandasSQL.run_transaction() as trans:
            trans.execute(ins_sql)
        res2 = self.pandasSQL.read_query("SELECT * FROM test_trans")
        assert len(res2) == 1


# -----------------------------------------------------------------------------
# -- Testing the public API


class _TestSQLApi(PandasSQLTest):

    """
    Base class to test the public API.

    From this two classes are derived to run these tests for both the
    sqlalchemy mode (`TestSQLApi`) and the fallback mode
    (`TestSQLiteFallbackApi`).  These tests are run with sqlite3. Specific
    tests for the different sql flavours are included in `_TestSQLAlchemy`.

    Notes:
    flavor can always be passed even in SQLAlchemy mode,
    should be correctly ignored.

    we don't use drop_table because that isn't part of the public api

    """

    flavor = "sqlite"
    mode = None

    def setup_connect(self):
        self.conn = self.connect()

    @pytest.fixture(autouse=True)
    def setup_method(self, load_iris_data):
        self.load_test_data_and_sql()

    def load_test_data_and_sql(self):
        self._load_iris_view()
        self._load_test1_data()
        self._load_test2_data()
        self._load_test3_data()
        self._load_raw_sql()

    def test_read_sql_iris(self):
        iris_frame = sql.read_sql_query("SELECT * FROM iris", self.conn)
        self._check_iris_loaded_frame(iris_frame)

    def test_read_sql_view(self):
        iris_frame = sql.read_sql_query("SELECT * FROM iris_view", self.conn)
        self._check_iris_loaded_frame(iris_frame)

    def test_to_sql(self):
        sql.to_sql(self.test_frame1, "test_frame1", self.conn)
        assert sql.has_table("test_frame1", self.conn)

    def test_to_sql_fail(self):
        sql.to_sql(self.test_frame1, "test_frame2", self.conn, if_exists="fail")
        assert sql.has_table("test_frame2", self.conn)

        msg = "Table 'test_frame2' already exists"
        with pytest.raises(ValueError, match=msg):
            sql.to_sql(self.test_frame1, "test_frame2", self.conn, if_exists="fail")

    def test_to_sql_replace(self):
        sql.to_sql(self.test_frame1, "test_frame3", self.conn, if_exists="fail")
        # Add to table again
        sql.to_sql(self.test_frame1, "test_frame3", self.conn, if_exists="replace")
        assert sql.has_table("test_frame3", self.conn)

        num_entries = len(self.test_frame1)
        num_rows = self._count_rows("test_frame3")

        assert num_rows == num_entries

    def test_to_sql_append(self):
        sql.to_sql(self.test_frame1, "test_frame4", self.conn, if_exists="fail")

        # Add to table again
        sql.to_sql(self.test_frame1, "test_frame4", self.conn, if_exists="append")
        assert sql.has_table("test_frame4", self.conn)

        num_entries = 2 * len(self.test_frame1)
        num_rows = self._count_rows("test_frame4")

        assert num_rows == num_entries

    def test_to_sql_type_mapping(self):
        sql.to_sql(self.test_frame3, "test_frame5", self.conn, index=False)
        result = sql.read_sql("SELECT * FROM test_frame5", self.conn)

        tm.assert_frame_equal(self.test_frame3, result)

    def test_to_sql_series(self):
        s = Series(np.arange(5, dtype="int64"), name="series")
        sql.to_sql(s, "test_series", self.conn, index=False)
        s2 = sql.read_sql_query("SELECT * FROM test_series", self.conn)
        tm.assert_frame_equal(s.to_frame(), s2)

    def test_roundtrip(self):
        sql.to_sql(self.test_frame1, "test_frame_roundtrip", con=self.conn)
        result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=self.conn)

        # HACK!
        result.index = self.test_frame1.index
        result.set_index("level_0", inplace=True)
        result.index.astype(int)
        result.index.name = None
        tm.assert_frame_equal(result, self.test_frame1)

    def test_roundtrip_chunksize(self):
        sql.to_sql(
            self.test_frame1,
            "test_frame_roundtrip",
            con=self.conn,
            index=False,
            chunksize=2,
        )
        result = sql.read_sql_query("SELECT * FROM test_frame_roundtrip", con=self.conn)
        tm.assert_frame_equal(result, self.test_frame1)

    def test_execute_sql(self):
        # drop_sql = "DROP TABLE IF EXISTS test"  # should already be done
        iris_results = sql.execute("SELECT * FROM iris", con=self.conn)
        row = iris_results.fetchone()
        tm.equalContents(row, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"])

    def test_date_parsing(self):
        # Test date parsing in read_sql
        # No Parsing
        df = sql.read_sql_query("SELECT * FROM types_test_data", self.conn)
        assert not issubclass(df.DateCol.dtype.type, np.datetime64)

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data", self.conn, parse_dates=["DateCol"]
        )
        assert issubclass(df.DateCol.dtype.type, np.datetime64)
        assert df.DateCol.tolist() == [
            pd.Timestamp(2000, 1, 3, 0, 0, 0),
            pd.Timestamp(2000, 1, 4, 0, 0, 0),
        ]

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data",
            self.conn,
            parse_dates={"DateCol": "%Y-%m-%d %H:%M:%S"},
        )
        assert issubclass(df.DateCol.dtype.type, np.datetime64)
        assert df.DateCol.tolist() == [
            pd.Timestamp(2000, 1, 3, 0, 0, 0),
            pd.Timestamp(2000, 1, 4, 0, 0, 0),
        ]

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data", self.conn, parse_dates=["IntDateCol"]
        )
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
        assert df.IntDateCol.tolist() == [
            pd.Timestamp(1986, 12, 25, 0, 0, 0),
            pd.Timestamp(2013, 1, 1, 0, 0, 0),
        ]

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data", self.conn, parse_dates={"IntDateCol": "s"}
        )
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)
        assert df.IntDateCol.tolist() == [
            pd.Timestamp(1986, 12, 25, 0, 0, 0),
            pd.Timestamp(2013, 1, 1, 0, 0, 0),
        ]

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data",
            self.conn,
            parse_dates={"IntDateOnlyCol": "%Y%m%d"},
        )
        assert issubclass(df.IntDateOnlyCol.dtype.type, np.datetime64)
        assert df.IntDateOnlyCol.tolist() == [
            pd.Timestamp("2010-10-10"),
            pd.Timestamp("2010-12-12"),
        ]

    def test_date_and_index(self):
        # Test case where same column appears in parse_date and index_col

        df = sql.read_sql_query(
            "SELECT * FROM types_test_data",
            self.conn,
            index_col="DateCol",
            parse_dates=["DateCol", "IntDateCol"],
        )

        assert issubclass(df.index.dtype.type, np.datetime64)
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)

    def test_timedelta(self):

        # see #6921
        df = to_timedelta(Series(["00:00:01", "00:00:03"], name="foo")).to_frame()
        with tm.assert_produces_warning(UserWarning):
            df.to_sql("test_timedelta", self.conn)
        result = sql.read_sql_query("SELECT * FROM test_timedelta", self.conn)
        tm.assert_series_equal(result["foo"], df["foo"].astype("int64"))

    def test_complex_raises(self):
        df = DataFrame({"a": [1 + 1j, 2j]})
        msg = "Complex datatypes not supported"
        with pytest.raises(ValueError, match=msg):
            df.to_sql("test_complex", self.conn)

    @pytest.mark.parametrize(
        "index_name,index_label,expected",
        [
            # no index name, defaults to 'index'
            (None, None, "index"),
            # specifying index_label
            (None, "other_label", "other_label"),
            # using the index name
            ("index_name", None, "index_name"),
            # has index name, but specifying index_label
            ("index_name", "other_label", "other_label"),
            # index name is integer
            (0, None, "0"),
            # index name is None but index label is integer
            (None, 0, "0"),
        ],
    )
    def test_to_sql_index_label(self, index_name, index_label, expected):
        temp_frame = DataFrame({"col1": range(4)})
        temp_frame.index.name = index_name
        query = "SELECT * FROM test_index_label"
        sql.to_sql(temp_frame, "test_index_label", self.conn, index_label=index_label)
        frame = sql.read_sql_query(query, self.conn)
        assert frame.columns[0] == expected

    def test_to_sql_index_label_multiindex(self):
        temp_frame = DataFrame(
            {"col1": range(4)},
            index=MultiIndex.from_product([("A0", "A1"), ("B0", "B1")]),
        )

        # no index name, defaults to 'level_0' and 'level_1'
        sql.to_sql(temp_frame, "test_index_label", self.conn)
        frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn)
        assert frame.columns[0] == "level_0"
        assert frame.columns[1] == "level_1"

        # specifying index_label
        sql.to_sql(
            temp_frame,
            "test_index_label",
            self.conn,
            if_exists="replace",
            index_label=["A", "B"],
        )
        frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn)
        assert frame.columns[:2].tolist() == ["A", "B"]

        # using the index name
        temp_frame.index.names = ["A", "B"]
        sql.to_sql(temp_frame, "test_index_label", self.conn, if_exists="replace")
        frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn)
        assert frame.columns[:2].tolist() == ["A", "B"]

        # has index name, but specifying index_label
        sql.to_sql(
            temp_frame,
            "test_index_label",
            self.conn,
            if_exists="replace",
            index_label=["C", "D"],
        )
        frame = sql.read_sql_query("SELECT * FROM test_index_label", self.conn)
        assert frame.columns[:2].tolist() == ["C", "D"]

        msg = "Length of 'index_label' should match number of levels, which is 2"
        with pytest.raises(ValueError, match=msg):
            sql.to_sql(
                temp_frame,
                "test_index_label",
                self.conn,
                if_exists="replace",
                index_label="C",
            )

    def test_multiindex_roundtrip(self):
        df = DataFrame.from_records(
            [(1, 2.1, "line1"), (2, 1.5, "line2")],
            columns=["A", "B", "C"],
            index=["A", "B"],
        )

        df.to_sql("test_multiindex_roundtrip", self.conn)
        result = sql.read_sql_query(
            "SELECT * FROM test_multiindex_roundtrip", self.conn, index_col=["A", "B"]
        )
        tm.assert_frame_equal(df, result, check_index_type=True)

    def test_integer_col_names(self):
        df = DataFrame([[1, 2], [3, 4]], columns=[0, 1])
        sql.to_sql(df, "test_frame_integer_col_names", self.conn, if_exists="replace")

    def test_get_schema(self):
        create_sql = sql.get_schema(self.test_frame1, "test", con=self.conn)
        assert "CREATE" in create_sql

    def test_get_schema_dtypes(self):
        float_frame = DataFrame({"a": [1.1, 1.2], "b": [2.1, 2.2]})
        dtype = sqlalchemy.Integer if self.mode == "sqlalchemy" else "INTEGER"
        create_sql = sql.get_schema(
            float_frame, "test", con=self.conn, dtype={"b": dtype}
        )
        assert "CREATE" in create_sql
        assert "INTEGER" in create_sql

    def test_get_schema_keys(self):
        frame = DataFrame({"Col1": [1.1, 1.2], "Col2": [2.1, 2.2]})
        create_sql = sql.get_schema(frame, "test", con=self.conn, keys="Col1")
        constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("Col1")'
        assert constraint_sentence in create_sql

        # multiple columns as key (GH10385)
        create_sql = sql.get_schema(
            self.test_frame1, "test", con=self.conn, keys=["A", "B"]
        )
        constraint_sentence = 'CONSTRAINT test_pk PRIMARY KEY ("A", "B")'
        assert constraint_sentence in create_sql

    def test_chunksize_read(self):
        df = DataFrame(np.random.randn(22, 5), columns=list("abcde"))
        df.to_sql("test_chunksize", self.conn, index=False)

        # reading the query in one time
        res1 = sql.read_sql_query("select * from test_chunksize", self.conn)

        # reading the query in chunks with read_sql_query
        res2 = DataFrame()
        i = 0
        sizes = [5, 5, 5, 5, 2]

        for chunk in sql.read_sql_query(
            "select * from test_chunksize", self.conn, chunksize=5
        ):
            res2 = concat([res2, chunk], ignore_index=True)
            assert len(chunk) == sizes[i]
            i += 1

        tm.assert_frame_equal(res1, res2)

        # reading the query in chunks with read_sql_query
        if self.mode == "sqlalchemy":
            res3 = DataFrame()
            i = 0
            sizes = [5, 5, 5, 5, 2]

            for chunk in sql.read_sql_table("test_chunksize", self.conn, chunksize=5):
                res3 = concat([res3, chunk], ignore_index=True)
                assert len(chunk) == sizes[i]
                i += 1

            tm.assert_frame_equal(res1, res3)

    def test_categorical(self):
        # GH8624
        # test that categorical gets written correctly as dense column
        df = DataFrame(
            {
                "person_id": [1, 2, 3],
                "person_name": ["John P. Doe", "Jane Dove", "John P. Doe"],
            }
        )
        df2 = df.copy()
        df2["person_name"] = df2["person_name"].astype("category")

        df2.to_sql("test_categorical", self.conn, index=False)
        res = sql.read_sql_query("SELECT * FROM test_categorical", self.conn)

        tm.assert_frame_equal(res, df)

    def test_unicode_column_name(self):
        # GH 11431
        df = DataFrame([[1, 2], [3, 4]], columns=["\xe9", "b"])
        df.to_sql("test_unicode", self.conn, index=False)

    def test_escaped_table_name(self):
        # GH 13206
        df = DataFrame({"A": [0, 1, 2], "B": [0.2, np.nan, 5.6]})
        df.to_sql("d1187b08-4943-4c8d-a7f6", self.conn, index=False)

        res = sql.read_sql_query("SELECT * FROM `d1187b08-4943-4c8d-a7f6`", self.conn)

        tm.assert_frame_equal(res, df)


@pytest.mark.single
@pytest.mark.skipif(not SQLALCHEMY_INSTALLED, reason="SQLAlchemy not installed")
class TestSQLApi(SQLAlchemyMixIn, _TestSQLApi):
    """
    Test the public API as it would be used directly

    Tests for `read_sql_table` are included here, as this is specific for the
    sqlalchemy mode.

    """

    flavor = "sqlite"
    mode = "sqlalchemy"

    def connect(self):
        return sqlalchemy.create_engine("sqlite:///:memory:")

    def test_read_table_columns(self):
        # test columns argument in read_table
        sql.to_sql(self.test_frame1, "test_frame", self.conn)

        cols = ["A", "B"]
        result = sql.read_sql_table("test_frame", self.conn, columns=cols)
        assert result.columns.tolist() == cols

    def test_read_table_index_col(self):
        # test columns argument in read_table
        sql.to_sql(self.test_frame1, "test_frame", self.conn)

        result = sql.read_sql_table("test_frame", self.conn, index_col="index")
        assert result.index.names == ["index"]

        result = sql.read_sql_table("test_frame", self.conn, index_col=["A", "B"])
        assert result.index.names == ["A", "B"]

        result = sql.read_sql_table(
            "test_frame", self.conn, index_col=["A", "B"], columns=["C", "D"]
        )
        assert result.index.names == ["A", "B"]
        assert result.columns.tolist() == ["C", "D"]

    def test_read_sql_delegate(self):
        iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
        iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn)
        tm.assert_frame_equal(iris_frame1, iris_frame2)

        iris_frame1 = sql.read_sql_table("iris", self.conn)
        iris_frame2 = sql.read_sql("iris", self.conn)
        tm.assert_frame_equal(iris_frame1, iris_frame2)

    def test_not_reflect_all_tables(self):
        # create invalid table
        qry = """CREATE TABLE invalid (x INTEGER, y UNKNOWN);"""
        self.conn.execute(qry)
        qry = """CREATE TABLE other_table (x INTEGER, y INTEGER);"""
        self.conn.execute(qry)

        with warnings.catch_warnings(record=True) as w:
            # Cause all warnings to always be triggered.
            warnings.simplefilter("always")
            # Trigger a warning.
            sql.read_sql_table("other_table", self.conn)
            sql.read_sql_query("SELECT * FROM other_table", self.conn)
            # Verify some things
            assert len(w) == 0

    def test_warning_case_insensitive_table_name(self):
        # see gh-7815
        #
        # We can't test that this warning is triggered, a the database
        # configuration would have to be altered. But here we test that
        # the warning is certainly NOT triggered in a normal case.
        with warnings.catch_warnings(record=True) as w:
            # Cause all warnings to always be triggered.
            warnings.simplefilter("always")
            # This should not trigger a Warning
            self.test_frame1.to_sql("CaseSensitive", self.conn)
            # Verify some things
            assert len(w) == 0

    def _get_index_columns(self, tbl_name):
        from sqlalchemy.engine import reflection

        insp = reflection.Inspector.from_engine(self.conn)
        ixs = insp.get_indexes("test_index_saved")
        ixs = [i["column_names"] for i in ixs]
        return ixs

    def test_sqlalchemy_type_mapping(self):

        # Test Timestamp objects (no datetime64 because of timezone) (GH9085)
        df = DataFrame(
            {"time": to_datetime(["201412120154", "201412110254"], utc=True)}
        )
        db = sql.SQLDatabase(self.conn)
        table = sql.SQLTable("test_type", db, frame=df)
        # GH 9086: TIMESTAMP is the suggested type for datetimes with timezones
        assert isinstance(table.table.c["time"].type, sqltypes.TIMESTAMP)

    def test_database_uri_string(self):

        # Test read_sql and .to_sql method with a database URI (GH10654)
        test_frame1 = self.test_frame1
        # db_uri = 'sqlite:///:memory:' # raises
        # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) near
        # "iris": syntax error [SQL: 'iris']
        with tm.ensure_clean() as name:
            db_uri = "sqlite:///" + name
            table = "iris"
            test_frame1.to_sql(table, db_uri, if_exists="replace", index=False)
            test_frame2 = sql.read_sql(table, db_uri)
            test_frame3 = sql.read_sql_table(table, db_uri)
            query = "SELECT * FROM iris"
            test_frame4 = sql.read_sql_query(query, db_uri)
        tm.assert_frame_equal(test_frame1, test_frame2)
        tm.assert_frame_equal(test_frame1, test_frame3)
        tm.assert_frame_equal(test_frame1, test_frame4)

        # using driver that will not be installed on Travis to trigger error
        # in sqlalchemy.create_engine -> test passing of this error to user
        try:
            # the rest of this test depends on pg8000's being absent
            import pg8000  # noqa

            pytest.skip("pg8000 is installed")
        except ImportError:
            pass

        db_uri = "postgresql+pg8000://user:pass@host/dbname"
        with pytest.raises(ImportError, match="pg8000"):
            sql.read_sql("select * from table", db_uri)

    def _make_iris_table_metadata(self):
        sa = sqlalchemy
        metadata = sa.MetaData()
        iris = sa.Table(
            "iris",
            metadata,
            sa.Column("SepalLength", sa.REAL),
            sa.Column("SepalWidth", sa.REAL),
            sa.Column("PetalLength", sa.REAL),
            sa.Column("PetalWidth", sa.REAL),
            sa.Column("Name", sa.TEXT),
        )

        return iris

    def test_query_by_text_obj(self):
        # WIP : GH10846
        name_text = sqlalchemy.text("select * from iris where name=:name")
        iris_df = sql.read_sql(name_text, self.conn, params={"name": "Iris-versicolor"})
        all_names = set(iris_df["Name"])
        assert all_names == {"Iris-versicolor"}

    def test_query_by_select_obj(self):
        # WIP : GH10846
        iris = self._make_iris_table_metadata()

        name_select = sqlalchemy.select([iris]).where(
            iris.c.Name == sqlalchemy.bindparam("name")
        )
        iris_df = sql.read_sql(name_select, self.conn, params={"name": "Iris-setosa"})
        all_names = set(iris_df["Name"])
        assert all_names == {"Iris-setosa"}


class _EngineToConnMixin:
    """
    A mixin that causes setup_connect to create a conn rather than an engine.
    """

    @pytest.fixture(autouse=True)
    def setup_method(self, load_iris_data):
        super().load_test_data_and_sql()
        engine = self.conn
        conn = engine.connect()
        self.__tx = conn.begin()
        self.pandasSQL = sql.SQLDatabase(conn)
        self.__engine = engine
        self.conn = conn

        yield

        self.__tx.rollback()
        self.conn.close()
        self.conn = self.__engine
        self.pandasSQL = sql.SQLDatabase(self.__engine)
        # XXX:
        # super().teardown_method(method)


@pytest.mark.single
class TestSQLApiConn(_EngineToConnMixin, TestSQLApi):
    pass


@pytest.mark.single
class TestSQLiteFallbackApi(SQLiteMixIn, _TestSQLApi):
    """
    Test the public sqlite connection fallback API

    """

    flavor = "sqlite"
    mode = "fallback"

    def connect(self, database=":memory:"):
        return sqlite3.connect(database)

    def test_sql_open_close(self):
        # Test if the IO in the database still work if the connection closed
        # between the writing and reading (as in many real situations).

        with tm.ensure_clean() as name:

            conn = self.connect(name)
            sql.to_sql(self.test_frame3, "test_frame3_legacy", conn, index=False)
            conn.close()

            conn = self.connect(name)
            result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;", conn)
            conn.close()

        tm.assert_frame_equal(self.test_frame3, result)

    @pytest.mark.skipif(SQLALCHEMY_INSTALLED, reason="SQLAlchemy is installed")
    def test_con_string_import_error(self):
        conn = "mysql://root@localhost/pandas_nosetest"
        msg = "Using URI string without sqlalchemy installed"
        with pytest.raises(ImportError, match=msg):
            sql.read_sql("SELECT * FROM iris", conn)

    def test_read_sql_delegate(self):
        iris_frame1 = sql.read_sql_query("SELECT * FROM iris", self.conn)
        iris_frame2 = sql.read_sql("SELECT * FROM iris", self.conn)
        tm.assert_frame_equal(iris_frame1, iris_frame2)

        msg = "Execution failed on sql 'iris': near \"iris\": syntax error"
        with pytest.raises(sql.DatabaseError, match=msg):
            sql.read_sql("iris", self.conn)

    def test_safe_names_warning(self):
        # GH 6798
        df = DataFrame([[1, 2], [3, 4]], columns=["a", "b "])  # has a space
        # warns on create table with spaces in names
        with tm.assert_produces_warning():
            sql.to_sql(df, "test_frame3_legacy", self.conn, index=False)

    def test_get_schema2(self):
        # without providing a connection object (available for backwards comp)
        create_sql = sql.get_schema(self.test_frame1, "test")
        assert "CREATE" in create_sql

    def _get_sqlite_column_type(self, schema, column):

        for col in schema.split("\n"):
            if col.split()[0].strip('""') == column:
                return col.split()[1]
        raise ValueError("Column {column} not found".format(column=column))

    def test_sqlite_type_mapping(self):

        # Test Timestamp objects (no datetime64 because of timezone) (GH9085)
        df = DataFrame(
            {"time": to_datetime(["201412120154", "201412110254"], utc=True)}
        )
        db = sql.SQLiteDatabase(self.conn)
        table = sql.SQLiteTable("test_type", db, frame=df)
        schema = table.sql_schema()
        assert self._get_sqlite_column_type(schema, "time") == "TIMESTAMP"


# -----------------------------------------------------------------------------
# -- Database flavor specific tests


class _TestSQLAlchemy(SQLAlchemyMixIn, PandasSQLTest):
    """
    Base class for testing the sqlalchemy backend.

    Subclasses for specific database types are created below. Tests that
    deviate for each flavor are overwritten there.

    """

    flavor = None

    @pytest.fixture(autouse=True, scope="class")
    def setup_class(cls):
        cls.setup_import()
        cls.setup_driver()
        conn = cls.connect()
        conn.connect()

    def load_test_data_and_sql(self):
        self._load_raw_sql()
        self._load_test1_data()

    @pytest.fixture(autouse=True)
    def setup_method(self, load_iris_data):
        self.load_test_data_and_sql()

    @classmethod
    def setup_import(cls):
        # Skip this test if SQLAlchemy not available
        if not SQLALCHEMY_INSTALLED:
            pytest.skip("SQLAlchemy not installed")

    @classmethod
    def setup_driver(cls):
        raise NotImplementedError()

    @classmethod
    def connect(cls):
        raise NotImplementedError()

    def setup_connect(self):
        try:
            self.conn = self.connect()
            self.pandasSQL = sql.SQLDatabase(self.conn)
            # to test if connection can be made:
            self.conn.connect()
        except sqlalchemy.exc.OperationalError:
            pytest.skip("Can't connect to {0} server".format(self.flavor))

    def test_read_sql(self):
        self._read_sql_iris()

    def test_read_sql_parameter(self):
        self._read_sql_iris_parameter()

    def test_read_sql_named_parameter(self):
        self._read_sql_iris_named_parameter()

    def test_to_sql(self):
        self._to_sql()

    def test_to_sql_empty(self):
        self._to_sql_empty()

    def test_to_sql_fail(self):
        self._to_sql_fail()

    def test_to_sql_replace(self):
        self._to_sql_replace()

    def test_to_sql_append(self):
        self._to_sql_append()

    def test_to_sql_method_multi(self):
        self._to_sql(method="multi")

    def test_to_sql_method_callable(self):
        self._to_sql_method_callable()

    def test_create_table(self):
        temp_conn = self.connect()
        temp_frame = DataFrame(
            {"one": [1.0, 2.0, 3.0, 4.0], "two": [4.0, 3.0, 2.0, 1.0]}
        )

        pandasSQL = sql.SQLDatabase(temp_conn)
        pandasSQL.to_sql(temp_frame, "temp_frame")

        assert temp_conn.has_table("temp_frame")

    def test_drop_table(self):
        temp_conn = self.connect()

        temp_frame = DataFrame(
            {"one": [1.0, 2.0, 3.0, 4.0], "two": [4.0, 3.0, 2.0, 1.0]}
        )

        pandasSQL = sql.SQLDatabase(temp_conn)
        pandasSQL.to_sql(temp_frame, "temp_frame")

        assert temp_conn.has_table("temp_frame")

        pandasSQL.drop_table("temp_frame")

        assert not temp_conn.has_table("temp_frame")

    def test_roundtrip(self):
        self._roundtrip()

    def test_execute_sql(self):
        self._execute_sql()

    def test_read_table(self):
        iris_frame = sql.read_sql_table("iris", con=self.conn)
        self._check_iris_loaded_frame(iris_frame)

    def test_read_table_columns(self):
        iris_frame = sql.read_sql_table(
            "iris", con=self.conn, columns=["SepalLength", "SepalLength"]
        )
        tm.equalContents(iris_frame.columns.values, ["SepalLength", "SepalLength"])

    def test_read_table_absent_raises(self):
        msg = "Table this_doesnt_exist not found"
        with pytest.raises(ValueError, match=msg):
            sql.read_sql_table("this_doesnt_exist", con=self.conn)

    def test_default_type_conversion(self):
        df = sql.read_sql_table("types_test_data", self.conn)

        assert issubclass(df.FloatCol.dtype.type, np.floating)
        assert issubclass(df.IntCol.dtype.type, np.integer)
        assert issubclass(df.BoolCol.dtype.type, np.bool_)

        # Int column with NA values stays as float
        assert issubclass(df.IntColWithNull.dtype.type, np.floating)
        # Bool column with NA values becomes object
        assert issubclass(df.BoolColWithNull.dtype.type, np.object)

    def test_bigint(self):
        # int64 should be converted to BigInteger, GH7433
        df = DataFrame(data={"i64": [2 ** 62]})
        df.to_sql("test_bigint", self.conn, index=False)
        result = sql.read_sql_table("test_bigint", self.conn)

        tm.assert_frame_equal(df, result)

    def test_default_date_load(self):
        df = sql.read_sql_table("types_test_data", self.conn)

        # IMPORTANT - sqlite has no native date type, so shouldn't parse, but
        # MySQL SHOULD be converted.
        assert issubclass(df.DateCol.dtype.type, np.datetime64)

    def test_datetime_with_timezone(self):
        # edge case that converts postgresql datetime with time zone types
        # to datetime64[ns,psycopg2.tz.FixedOffsetTimezone..], which is ok
        # but should be more natural, so coerce to datetime64[ns] for now

        def check(col):
            # check that a column is either datetime64[ns]
            # or datetime64[ns, UTC]
            if is_datetime64_dtype(col.dtype):

                # "2000-01-01 00:00:00-08:00" should convert to
                # "2000-01-01 08:00:00"
                assert col[0] == Timestamp("2000-01-01 08:00:00")

                # "2000-06-01 00:00:00-07:00" should convert to
                # "2000-06-01 07:00:00"
                assert col[1] == Timestamp("2000-06-01 07:00:00")

            elif is_datetime64tz_dtype(col.dtype):
                assert str(col.dt.tz) == "UTC"

                # "2000-01-01 00:00:00-08:00" should convert to
                # "2000-01-01 08:00:00"
                # "2000-06-01 00:00:00-07:00" should convert to
                # "2000-06-01 07:00:00"
                # GH 6415
                expected_data = [
                    Timestamp("2000-01-01 08:00:00", tz="UTC"),
                    Timestamp("2000-06-01 07:00:00", tz="UTC"),
                ]
                expected = Series(expected_data, name=col.name)
                tm.assert_series_equal(col, expected)

            else:
                raise AssertionError(
                    "DateCol loaded with incorrect type -> {0}".format(col.dtype)
                )

        # GH11216
        df = pd.read_sql_query("select * from types_test_data", self.conn)
        if not hasattr(df, "DateColWithTz"):
            pytest.skip("no column with datetime with time zone")

        # this is parsed on Travis (linux), but not on macosx for some reason
        # even with the same versions of psycopg2 & sqlalchemy, possibly a
        # Postgresql server version difference
        col = df.DateColWithTz
        assert is_datetime64tz_dtype(col.dtype)

        df = pd.read_sql_query(
            "select * from types_test_data", self.conn, parse_dates=["DateColWithTz"]
        )
        if not hasattr(df, "DateColWithTz"):
            pytest.skip("no column with datetime with time zone")
        col = df.DateColWithTz
        assert is_datetime64tz_dtype(col.dtype)
        assert str(col.dt.tz) == "UTC"
        check(df.DateColWithTz)

        df = pd.concat(
            list(
                pd.read_sql_query(
                    "select * from types_test_data", self.conn, chunksize=1
                )
            ),
            ignore_index=True,
        )
        col = df.DateColWithTz
        assert is_datetime64tz_dtype(col.dtype)
        assert str(col.dt.tz) == "UTC"
        expected = sql.read_sql_table("types_test_data", self.conn)
        col = expected.DateColWithTz
        assert is_datetime64tz_dtype(col.dtype)
        tm.assert_series_equal(df.DateColWithTz, expected.DateColWithTz)

        # xref #7139
        # this might or might not be converted depending on the postgres driver
        df = sql.read_sql_table("types_test_data", self.conn)
        check(df.DateColWithTz)

    def test_datetime_with_timezone_roundtrip(self):
        # GH 9086
        # Write datetimetz data to a db and read it back
        # For dbs that support timestamps with timezones, should get back UTC
        # otherwise naive data should be returned
        expected = DataFrame(
            {"A": date_range("2013-01-01 09:00:00", periods=3, tz="US/Pacific")}
        )
        expected.to_sql("test_datetime_tz", self.conn, index=False)

        if self.flavor == "postgresql":
            # SQLAlchemy "timezones" (i.e. offsets) are coerced to UTC
            expected["A"] = expected["A"].dt.tz_convert("UTC")
        else:
            # Otherwise, timestamps are returned as local, naive
            expected["A"] = expected["A"].dt.tz_localize(None)

        result = sql.read_sql_table("test_datetime_tz", self.conn)
        tm.assert_frame_equal(result, expected)

        result = sql.read_sql_query("SELECT * FROM test_datetime_tz", self.conn)
        if self.flavor == "sqlite":
            # read_sql_query does not return datetime type like read_sql_table
            assert isinstance(result.loc[0, "A"], str)
            result["A"] = to_datetime(result["A"])
        tm.assert_frame_equal(result, expected)

    def test_naive_datetimeindex_roundtrip(self):
        # GH 23510
        # Ensure that a naive DatetimeIndex isn't converted to UTC
        dates = date_range("2018-01-01", periods=5, freq="6H")
        expected = DataFrame({"nums": range(5)}, index=dates)
        expected.to_sql("foo_table", self.conn, index_label="info_date")
        result = sql.read_sql_table("foo_table", self.conn, index_col="info_date")
        # result index with gain a name from a set_index operation; expected
        tm.assert_frame_equal(result, expected, check_names=False)

    def test_date_parsing(self):
        # No Parsing
        df = sql.read_sql_table("types_test_data", self.conn)
        expected_type = object if self.flavor == "sqlite" else np.datetime64
        assert issubclass(df.DateCol.dtype.type, expected_type)

        df = sql.read_sql_table("types_test_data", self.conn, parse_dates=["DateCol"])
        assert issubclass(df.DateCol.dtype.type, np.datetime64)

        df = sql.read_sql_table(
            "types_test_data", self.conn, parse_dates={"DateCol": "%Y-%m-%d %H:%M:%S"}
        )
        assert issubclass(df.DateCol.dtype.type, np.datetime64)

        df = sql.read_sql_table(
            "types_test_data",
            self.conn,
            parse_dates={"DateCol": {"format": "%Y-%m-%d %H:%M:%S"}},
        )
        assert issubclass(df.DateCol.dtype.type, np.datetime64)

        df = sql.read_sql_table(
            "types_test_data", self.conn, parse_dates=["IntDateCol"]
        )
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)

        df = sql.read_sql_table(
            "types_test_data", self.conn, parse_dates={"IntDateCol": "s"}
        )
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)

        df = sql.read_sql_table(
            "types_test_data", self.conn, parse_dates={"IntDateCol": {"unit": "s"}}
        )
        assert issubclass(df.IntDateCol.dtype.type, np.datetime64)

    def test_datetime(self):
        df = DataFrame(
            {"A": date_range("2013-01-01 09:00:00", periods=3), "B": np.arange(3.0)}
        )
        df.to_sql("test_datetime", self.conn)

        # with read_table -> type information from schema used
        result = sql.read_sql_table("test_datetime", self.conn)
        result = result.drop("index", axis=1)
        tm.assert_frame_equal(result, df)

        # with read_sql -> no type information -> sqlite has no native
        result = sql.read_sql_query("SELECT * FROM test_datetime", self.conn)
        result = result.drop("index", axis=1)
        if self.flavor == "sqlite":
            assert isinstance(result.loc[0, "A"], str)
            result["A"] = to_datetime(result["A"])
            tm.assert_frame_equal(result, df)
        else:
            tm.assert_frame_equal(result, df)

    def test_datetime_NaT(self):
        df = DataFrame(
            {"A": date_range("2013-01-01 09:00:00", periods=3), "B": np.arange(3.0)}
        )
        df.loc[1, "A"] = np.nan
        df.to_sql("test_datetime", self.conn, index=False)

        # with read_table -> type information from schema used
        result = sql.read_sql_table("test_datetime", self.conn)
        tm.assert_frame_equal(result, df)

        # with read_sql -> no type information -> sqlite has no native
        result = sql.read_sql_query("SELECT * FROM test_datetime", self.conn)
        if self.flavor == "sqlite":
            assert isinstance(result.loc[0, "A"], str)
            result["A"] = to_datetime(result["A"], errors="coerce")
            tm.assert_frame_equal(result, df)
        else:
            tm.assert_frame_equal(result, df)

    def test_datetime_date(self):
        # test support for datetime.date
        df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"])
        df.to_sql("test_date", self.conn, index=False)
        res = read_sql_table("test_date", self.conn)
        result = res["a"]
        expected = to_datetime(df["a"])
        # comes back as datetime64
        tm.assert_series_equal(result, expected)

    def test_datetime_time(self):
        # test support for datetime.time
        df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
        df.to_sql("test_time", self.conn, index=False)
        res = read_sql_table("test_time", self.conn)
        tm.assert_frame_equal(res, df)

        # GH8341
        # first, use the fallback to have the sqlite adapter put in place
        sqlite_conn = TestSQLiteFallback.connect()
        sql.to_sql(df, "test_time2", sqlite_conn, index=False)
        res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
        ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
        tm.assert_frame_equal(ref, res)  # check if adapter is in place
        # then test if sqlalchemy is unaffected by the sqlite adapter
        sql.to_sql(df, "test_time3", self.conn, index=False)
        if self.flavor == "sqlite":
            res = sql.read_sql_query("SELECT * FROM test_time3", self.conn)
            ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
            tm.assert_frame_equal(ref, res)
        res = sql.read_sql_table("test_time3", self.conn)
        tm.assert_frame_equal(df, res)

    def test_mixed_dtype_insert(self):
        # see GH6509
        s1 = Series(2 ** 25 + 1, dtype=np.int32)
        s2 = Series(0.0, dtype=np.float32)
        df = DataFrame({"s1": s1, "s2": s2})

        # write and read again
        df.to_sql("test_read_write", self.conn, index=False)
        df2 = sql.read_sql_table("test_read_write", self.conn)

        tm.assert_frame_equal(df, df2, check_dtype=False, check_exact=True)

    def test_nan_numeric(self):
        # NaNs in numeric float column
        df = DataFrame({"A": [0, 1, 2], "B": [0.2, np.nan, 5.6]})
        df.to_sql("test_nan", self.conn, index=False)

        # with read_table
        result = sql.read_sql_table("test_nan", self.conn)
        tm.assert_frame_equal(result, df)

        # with read_sql
        result = sql.read_sql_query("SELECT * FROM test_nan", self.conn)
        tm.assert_frame_equal(result, df)

    def test_nan_fullcolumn(self):
        # full NaN column (numeric float column)
        df = DataFrame({"A": [0, 1, 2], "B": [np.nan, np.nan, np.nan]})
        df.to_sql("test_nan", self.conn, index=False)

        # with read_table
        result = sql.read_sql_table("test_nan", self.conn)
        tm.assert_frame_equal(result, df)

        # with read_sql -> not type info from table -> stays None
        df["B"] = df["B"].astype("object")
        df["B"] = None
        result = sql.read_sql_query("SELECT * FROM test_nan", self.conn)
        tm.assert_frame_equal(result, df)

    def test_nan_string(self):
        # NaNs in string column
        df = DataFrame({"A": [0, 1, 2], "B": ["a", "b", np.nan]})
        df.to_sql("test_nan", self.conn, index=False)

        # NaNs are coming back as None
        df.loc[2, "B"] = None

        # with read_table
        result = sql.read_sql_table("test_nan", self.conn)
        tm.assert_frame_equal(result, df)

        # with read_sql
        result = sql.read_sql_query("SELECT * FROM test_nan", self.conn)
        tm.assert_frame_equal(result, df)

    def _get_index_columns(self, tbl_name):
        from sqlalchemy.engine import reflection

        insp = reflection.Inspector.from_engine(self.conn)
        ixs = insp.get_indexes(tbl_name)
        ixs = [i["column_names"] for i in ixs]
        return ixs

    def test_to_sql_save_index(self):
        self._to_sql_save_index()

    def test_transactions(self):
        self._transaction_test()

    def test_get_schema_create_table(self):
        # Use a dataframe without a bool column, since MySQL converts bool to
        # TINYINT (which read_sql_table returns as an int and causes a dtype
        # mismatch)

        self._load_test3_data()
        tbl = "test_get_schema_create_table"
        create_sql = sql.get_schema(self.test_frame3, tbl, con=self.conn)
        blank_test_df = self.test_frame3.iloc[:0]

        self.drop_table(tbl)
        self.conn.execute(create_sql)
        returned_df = sql.read_sql_table(tbl, self.conn)
        tm.assert_frame_equal(returned_df, blank_test_df, check_index_type=False)
        self.drop_table(tbl)

    def test_dtype(self):
        cols = ["A", "B"]
        data = [(0.8, True), (0.9, None)]
        df = DataFrame(data, columns=cols)
        df.to_sql("dtype_test", self.conn)
        df.to_sql("dtype_test2", self.conn, dtype={"B": sqlalchemy.TEXT})
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        sqltype = meta.tables["dtype_test2"].columns["B"].type
        assert isinstance(sqltype, sqlalchemy.TEXT)
        msg = "The type of B is not a SQLAlchemy type"
        with pytest.raises(ValueError, match=msg):
            df.to_sql("error", self.conn, dtype={"B": str})

        # GH9083
        df.to_sql("dtype_test3", self.conn, dtype={"B": sqlalchemy.String(10)})
        meta.reflect()
        sqltype = meta.tables["dtype_test3"].columns["B"].type
        assert isinstance(sqltype, sqlalchemy.String)
        assert sqltype.length == 10

        # single dtype
        df.to_sql("single_dtype_test", self.conn, dtype=sqlalchemy.TEXT)
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        sqltypea = meta.tables["single_dtype_test"].columns["A"].type
        sqltypeb = meta.tables["single_dtype_test"].columns["B"].type
        assert isinstance(sqltypea, sqlalchemy.TEXT)
        assert isinstance(sqltypeb, sqlalchemy.TEXT)

    def test_notna_dtype(self):
        cols = {
            "Bool": Series([True, None]),
            "Date": Series([datetime(2012, 5, 1), None]),
            "Int": Series([1, None], dtype="object"),
            "Float": Series([1.1, None]),
        }
        df = DataFrame(cols)

        tbl = "notna_dtype_test"
        df.to_sql(tbl, self.conn)
        returned_df = sql.read_sql_table(tbl, self.conn)  # noqa
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        if self.flavor == "mysql":
            my_type = sqltypes.Integer
        else:
            my_type = sqltypes.Boolean

        col_dict = meta.tables[tbl].columns

        assert isinstance(col_dict["Bool"].type, my_type)
        assert isinstance(col_dict["Date"].type, sqltypes.DateTime)
        assert isinstance(col_dict["Int"].type, sqltypes.Integer)
        assert isinstance(col_dict["Float"].type, sqltypes.Float)

    def test_double_precision(self):
        V = 1.23456789101112131415

        df = DataFrame(
            {
                "f32": Series([V], dtype="float32"),
                "f64": Series([V], dtype="float64"),
                "f64_as_f32": Series([V], dtype="float64"),
                "i32": Series([5], dtype="int32"),
                "i64": Series([5], dtype="int64"),
            }
        )

        df.to_sql(
            "test_dtypes",
            self.conn,
            index=False,
            if_exists="replace",
            dtype={"f64_as_f32": sqlalchemy.Float(precision=23)},
        )
        res = sql.read_sql_table("test_dtypes", self.conn)

        # check precision of float64
        assert np.round(df["f64"].iloc[0], 14) == np.round(res["f64"].iloc[0], 14)

        # check sql types
        meta = sqlalchemy.schema.MetaData(bind=self.conn)
        meta.reflect()
        col_dict = meta.tables["test_dtypes"].columns
        assert str(col_dict["f32"].type) == str(col_dict["f64_as_f32"].type)
        assert isinstance(col_dict["f32"].type, sqltypes.Float)
        assert isinstance(col_dict["f64"].type, sqltypes.Float)
        assert isinstance(col_dict["i32"].type, sqltypes.Integer)
        assert isinstance(col_dict["i64"].type, sqltypes.BigInteger)

    def test_connectable_issue_example(self):
        # This tests the example raised in issue
        # https://github.com/pandas-dev/pandas/issues/10104

        def foo(connection):
            query = "SELECT test_foo_data FROM test_foo_data"
            return sql.read_sql_query(query, con=connection)

        def bar(connection, data):
            data.to_sql(name="test_foo_data", con=connection, if_exists="append")

        def main(connectable):
            with connectable.connect() as conn:
                with conn.begin():
                    foo_data = conn.run_callable(foo)
                    conn.run_callable(bar, foo_data)

        DataFrame({"test_foo_data": [0, 1, 2]}).to_sql("test_foo_data", self.conn)
        main(self.conn)

    def test_temporary_table(self):
        test_data = "Hello, World!"
        expected = DataFrame({"spam": [test_data]})
        Base = declarative.declarative_base()

        class Temporary(Base):
            __tablename__ = "temp_test"
            __table_args__ = {"prefixes": ["TEMPORARY"]}
            id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
            spam = sqlalchemy.Column(sqlalchemy.Unicode(30), nullable=False)

        Session = sa_session.sessionmaker(bind=self.conn)
        session = Session()
        with session.transaction:
            conn = session.connection()
            Temporary.__table__.create(conn)
            session.add(Temporary(spam=test_data))
            session.flush()
            df = sql.read_sql_query(sql=sqlalchemy.select([Temporary.spam]), con=conn)

        tm.assert_frame_equal(df, expected)


class _TestSQLAlchemyConn(_EngineToConnMixin, _TestSQLAlchemy):
    def test_transactions(self):
        pytest.skip("Nested transactions rollbacks don't work with Pandas")


class _TestSQLiteAlchemy:
    """
    Test the sqlalchemy backend against an in-memory sqlite database.

    """

    flavor = "sqlite"

    @classmethod
    def connect(cls):
        return sqlalchemy.create_engine("sqlite:///:memory:")

    @classmethod
    def setup_driver(cls):
        # sqlite3 is built-in
        cls.driver = None

    def test_default_type_conversion(self):
        df = sql.read_sql_table("types_test_data", self.conn)

        assert issubclass(df.FloatCol.dtype.type, np.floating)
        assert issubclass(df.IntCol.dtype.type, np.integer)

        # sqlite has no boolean type, so integer type is returned
        assert issubclass(df.BoolCol.dtype.type, np.integer)

        # Int column with NA values stays as float
        assert issubclass(df.IntColWithNull.dtype.type, np.floating)

        # Non-native Bool column with NA values stays as float
        assert issubclass(df.BoolColWithNull.dtype.type, np.floating)

    def test_default_date_load(self):
        df = sql.read_sql_table("types_test_data", self.conn)

        # IMPORTANT - sqlite has no native date type, so shouldn't parse, but
        assert not issubclass(df.DateCol.dtype.type, np.datetime64)

    def test_bigint_warning(self):
        # test no warning for BIGINT (to support int64) is raised (GH7433)
        df = DataFrame({"a": [1, 2]}, dtype="int64")
        df.to_sql("test_bigintwarning", self.conn, index=False)

        with warnings.catch_warnings(record=True) as w:
            warnings.simplefilter("always")
            sql.read_sql_table("test_bigintwarning", self.conn)
            assert len(w) == 0


class _TestMySQLAlchemy:
    """
    Test the sqlalchemy backend against an MySQL database.

    """

    flavor = "mysql"

    @classmethod
    def connect(cls):
        url = "mysql+{driver}://root@localhost/pandas_nosetest"
        return sqlalchemy.create_engine(
            url.format(driver=cls.driver), connect_args=cls.connect_args
        )

    @classmethod
    def setup_driver(cls):
        pymysql = pytest.importorskip("pymysql")
        cls.driver = "pymysql"
        cls.connect_args = {"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}

    def test_default_type_conversion(self):
        df = sql.read_sql_table("types_test_data", self.conn)

        assert issubclass(df.FloatCol.dtype.type, np.floating)
        assert issubclass(df.IntCol.dtype.type, np.integer)

        # MySQL has no real BOOL type (it's an alias for TINYINT)
        assert issubclass(df.BoolCol.dtype.type, np.integer)

        # Int column with NA values stays as float
        assert issubclass(df.IntColWithNull.dtype.type, np.floating)

        # Bool column with NA = int column with NA values => becomes float
        assert issubclass(df.BoolColWithNull.dtype.type, np.floating)

    def test_read_procedure(self):
        import pymysql

        # see GH7324. Although it is more an api test, it is added to the
        # mysql tests as sqlite does not have stored procedures
        df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]})
        df.to_sql("test_procedure", self.conn, index=False)

        proc = """DROP PROCEDURE IF EXISTS get_testdb;

        CREATE PROCEDURE get_testdb ()

        BEGIN
            SELECT * FROM test_procedure;
        END"""

        connection = self.conn.connect()
        trans = connection.begin()
        try:
            r1 = connection.execute(proc)  # noqa
            trans.commit()
        except pymysql.Error:
            trans.rollback()
            raise

        res1 = sql.read_sql_query("CALL get_testdb();", self.conn)
        tm.assert_frame_equal(df, res1)

        # test delegation to read_sql_query
        res2 = sql.read_sql("CALL get_testdb();", self.conn)
        tm.assert_frame_equal(df, res2)


class _TestPostgreSQLAlchemy:
    """
    Test the sqlalchemy backend against an PostgreSQL database.

    """

    flavor = "postgresql"

    @classmethod
    def connect(cls):
        url = "postgresql+{driver}://postgres@localhost/pandas_nosetest"
        return sqlalchemy.create_engine(url.format(driver=cls.driver))

    @classmethod
    def setup_driver(cls):
        pytest.importorskip("psycopg2")
        cls.driver = "psycopg2"

    def test_schema_support(self):
        # only test this for postgresql (schema's not supported in
        # mysql/sqlite)
        df = DataFrame({"col1": [1, 2], "col2": [0.1, 0.2], "col3": ["a", "n"]})

        # create a schema
        self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
        self.conn.execute("CREATE SCHEMA other;")

        # write dataframe to different schema's
        df.to_sql("test_schema_public", self.conn, index=False)
        df.to_sql(
            "test_schema_public_explicit", self.conn, index=False, schema="public"
        )
        df.to_sql("test_schema_other", self.conn, index=False, schema="other")

        # read dataframes back in
        res1 = sql.read_sql_table("test_schema_public", self.conn)
        tm.assert_frame_equal(df, res1)
        res2 = sql.read_sql_table("test_schema_public_explicit", self.conn)
        tm.assert_frame_equal(df, res2)
        res3 = sql.read_sql_table(
            "test_schema_public_explicit", self.conn, schema="public"
        )
        tm.assert_frame_equal(df, res3)
        res4 = sql.read_sql_table("test_schema_other", self.conn, schema="other")
        tm.assert_frame_equal(df, res4)
        msg = "Table test_schema_other not found"
        with pytest.raises(ValueError, match=msg):
            sql.read_sql_table("test_schema_other", self.conn, schema="public")

        # different if_exists options

        # create a schema
        self.conn.execute("DROP SCHEMA IF EXISTS other CASCADE;")
        self.conn.execute("CREATE SCHEMA other;")

        # write dataframe with different if_exists options
        df.to_sql("test_schema_other", self.conn, schema="other", index=False)
        df.to_sql(
            "test_schema_other",
            self.conn,
            schema="other",
            index=False,
            if_exists="replace",
        )
        df.to_sql(
            "test_schema_other",
            self.conn,
            schema="other",
            index=False,
            if_exists="append",
        )
        res = sql.read_sql_table("test_schema_other", self.conn, schema="other")
        tm.assert_frame_equal(concat([df, df], ignore_index=True), res)

        # specifying schema in user-provided meta

        # The schema won't be applied on another Connection
        # because of transactional schemas
        if isinstance(self.conn, sqlalchemy.engine.Engine):
            engine2 = self.connect()
            meta = sqlalchemy.MetaData(engine2, schema="other")
            pdsql = sql.SQLDatabase(engine2, meta=meta)
            pdsql.to_sql(df, "test_schema_other2", index=False)
            pdsql.to_sql(df, "test_schema_other2", index=False, if_exists="replace")
            pdsql.to_sql(df, "test_schema_other2", index=False, if_exists="append")
            res1 = sql.read_sql_table("test_schema_other2", self.conn, schema="other")
            res2 = pdsql.read_table("test_schema_other2")
            tm.assert_frame_equal(res1, res2)

    def test_copy_from_callable_insertion_method(self):
        # GH 8953
        # Example in io.rst found under _io.sql.method
        # not available in sqlite, mysql
        def psql_insert_copy(table, conn, keys, data_iter):
            # gets a DBAPI connection that can provide a cursor
            dbapi_conn = conn.connection
            with dbapi_conn.cursor() as cur:
                s_buf = StringIO()
                writer = csv.writer(s_buf)
                writer.writerows(data_iter)
                s_buf.seek(0)

                columns = ", ".join('"{}"'.format(k) for k in keys)
                if table.schema:
                    table_name = "{}.{}".format(table.schema, table.name)
                else:
                    table_name = table.name

                sql_query = "COPY {} ({}) FROM STDIN WITH CSV".format(
                    table_name, columns
                )
                cur.copy_expert(sql=sql_query, file=s_buf)

        expected = DataFrame({"col1": [1, 2], "col2": [0.1, 0.2], "col3": ["a", "n"]})
        expected.to_sql(
            "test_copy_insert", self.conn, index=False, method=psql_insert_copy
        )
        result = sql.read_sql_table("test_copy_insert", self.conn)
        tm.assert_frame_equal(result, expected)


@pytest.mark.single
@pytest.mark.db
class TestMySQLAlchemy(_TestMySQLAlchemy, _TestSQLAlchemy):
    pass


@pytest.mark.single
@pytest.mark.db
class TestMySQLAlchemyConn(_TestMySQLAlchemy, _TestSQLAlchemyConn):
    pass


@pytest.mark.single
@pytest.mark.db
class TestPostgreSQLAlchemy(_TestPostgreSQLAlchemy, _TestSQLAlchemy):
    pass


@pytest.mark.single
@pytest.mark.db
class TestPostgreSQLAlchemyConn(_TestPostgreSQLAlchemy, _TestSQLAlchemyConn):
    pass


@pytest.mark.single
class TestSQLiteAlchemy(_TestSQLiteAlchemy, _TestSQLAlchemy):
    pass


@pytest.mark.single
class TestSQLiteAlchemyConn(_TestSQLiteAlchemy, _TestSQLAlchemyConn):
    pass


# -----------------------------------------------------------------------------
# -- Test Sqlite / MySQL fallback


@pytest.mark.single
class TestSQLiteFallback(SQLiteMixIn, PandasSQLTest):
    """
    Test the fallback mode against an in-memory sqlite database.

    """

    flavor = "sqlite"

    @classmethod
    def connect(cls):
        return sqlite3.connect(":memory:")

    def setup_connect(self):
        self.conn = self.connect()

    def load_test_data_and_sql(self):
        self.pandasSQL = sql.SQLiteDatabase(self.conn)
        self._load_test1_data()

    @pytest.fixture(autouse=True)
    def setup_method(self, load_iris_data):
        self.load_test_data_and_sql()

    def test_read_sql(self):
        self._read_sql_iris()

    def test_read_sql_parameter(self):
        self._read_sql_iris_parameter()

    def test_read_sql_named_parameter(self):
        self._read_sql_iris_named_parameter()

    def test_to_sql(self):
        self._to_sql()

    def test_to_sql_empty(self):
        self._to_sql_empty()

    def test_to_sql_fail(self):
        self._to_sql_fail()

    def test_to_sql_replace(self):
        self._to_sql_replace()

    def test_to_sql_append(self):
        self._to_sql_append()

    def test_create_and_drop_table(self):
        temp_frame = DataFrame(
            {"one": [1.0, 2.0, 3.0, 4.0], "two": [4.0, 3.0, 2.0, 1.0]}
        )

        self.pandasSQL.to_sql(temp_frame, "drop_test_frame")

        assert self.pandasSQL.has_table("drop_test_frame")

        self.pandasSQL.drop_table("drop_test_frame")

        assert not self.pandasSQL.has_table("drop_test_frame")

    def test_roundtrip(self):
        self._roundtrip()

    def test_execute_sql(self):
        self._execute_sql()

    def test_datetime_date(self):
        # test support for datetime.date
        df = DataFrame([date(2014, 1, 1), date(2014, 1, 2)], columns=["a"])
        df.to_sql("test_date", self.conn, index=False)
        res = read_sql_query("SELECT * FROM test_date", self.conn)
        if self.flavor == "sqlite":
            # comes back as strings
            tm.assert_frame_equal(res, df.astype(str))
        elif self.flavor == "mysql":
            tm.assert_frame_equal(res, df)

    def test_datetime_time(self):
        # test support for datetime.time, GH #8341
        df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
        df.to_sql("test_time", self.conn, index=False)
        res = read_sql_query("SELECT * FROM test_time", self.conn)
        if self.flavor == "sqlite":
            # comes back as strings
            expected = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
            tm.assert_frame_equal(res, expected)

    def _get_index_columns(self, tbl_name):
        ixs = sql.read_sql_query(
            "SELECT * FROM sqlite_master WHERE type = 'index' "
            + "AND tbl_name = '{tbl_name}'".format(tbl_name=tbl_name),
            self.conn,
        )
        ix_cols = []
        for ix_name in ixs.name:
            ix_info = sql.read_sql_query(
                "PRAGMA index_info({ix_name})".format(ix_name=ix_name), self.conn
            )
            ix_cols.append(ix_info.name.tolist())
        return ix_cols

    def test_to_sql_save_index(self):
        self._to_sql_save_index()

    def test_transactions(self):
        if PY36:
            pytest.skip("not working on python > 3.5")
        self._transaction_test()

    def _get_sqlite_column_type(self, table, column):
        recs = self.conn.execute("PRAGMA table_info({table})".format(table=table))
        for cid, name, ctype, not_null, default, pk in recs:
            if name == column:
                return ctype
        raise ValueError(
            "Table {table}, column {column} not found".format(
                table=table, column=column
            )
        )

    def test_dtype(self):
        if self.flavor == "mysql":
            pytest.skip("Not applicable to MySQL legacy")
        cols = ["A", "B"]
        data = [(0.8, True), (0.9, None)]
        df = DataFrame(data, columns=cols)
        df.to_sql("dtype_test", self.conn)
        df.to_sql("dtype_test2", self.conn, dtype={"B": "STRING"})

        # sqlite stores Boolean values as INTEGER
        assert self._get_sqlite_column_type("dtype_test", "B") == "INTEGER"

        assert self._get_sqlite_column_type("dtype_test2", "B") == "STRING"
        msg = r"B \(<class 'bool'>\) not a string"
        with pytest.raises(ValueError, match=msg):
            df.to_sql("error", self.conn, dtype={"B": bool})

        # single dtype
        df.to_sql("single_dtype_test", self.conn, dtype="STRING")
        assert self._get_sqlite_column_type("single_dtype_test", "A") == "STRING"
        assert self._get_sqlite_column_type("single_dtype_test", "B") == "STRING"

    def test_notna_dtype(self):
        if self.flavor == "mysql":
            pytest.skip("Not applicable to MySQL legacy")

        cols = {
            "Bool": Series([True, None]),
            "Date": Series([datetime(2012, 5, 1), None]),
            "Int": Series([1, None], dtype="object"),
            "Float": Series([1.1, None]),
        }
        df = DataFrame(cols)

        tbl = "notna_dtype_test"
        df.to_sql(tbl, self.conn)

        assert self._get_sqlite_column_type(tbl, "Bool") == "INTEGER"
        assert self._get_sqlite_column_type(tbl, "Date") == "TIMESTAMP"
        assert self._get_sqlite_column_type(tbl, "Int") == "INTEGER"
        assert self._get_sqlite_column_type(tbl, "Float") == "REAL"

    def test_illegal_names(self):
        # For sqlite, these should work fine
        df = DataFrame([[1, 2], [3, 4]], columns=["a", "b"])

        msg = "Empty table or column name specified"
        with pytest.raises(ValueError, match=msg):
            df.to_sql("", self.conn)

        for ndx, weird_name in enumerate(
            [
                "test_weird_name]",
                "test_weird_name[",
                "test_weird_name`",
                'test_weird_name"',
                "test_weird_name'",
                "_b.test_weird_name_01-30",
                '"_b.test_weird_name_01-30"',
                "99beginswithnumber",
                "12345",
                "\xe9",
            ]
        ):
            df.to_sql(weird_name, self.conn)
            sql.table_exists(weird_name, self.conn)

            df2 = DataFrame([[1, 2], [3, 4]], columns=["a", weird_name])
            c_tbl = "test_weird_col_name{ndx:d}".format(ndx=ndx)
            df2.to_sql(c_tbl, self.conn)
            sql.table_exists(c_tbl, self.conn)


# -----------------------------------------------------------------------------
# -- Old tests from 0.13.1 (before refactor using sqlalchemy)


def date_format(dt):
    """Returns date in YYYYMMDD format."""
    return dt.strftime("%Y%m%d")


_formatters = {
    datetime: "'{}'".format,
    str: "'{}'".format,
    np.str_: "'{}'".format,
    bytes: "'{}'".format,
    float: "{:.8f}".format,
    int: "{:d}".format,
    type(None): lambda x: "NULL",
    np.float64: "{:.10f}".format,
    bool: "'{!s}'".format,
}


def format_query(sql, *args):
    """

    """
    processed_args = []
    for arg in args:
        if isinstance(arg, float) and isna(arg):
            arg = None

        formatter = _formatters[type(arg)]
        processed_args.append(formatter(arg))

    return sql % tuple(processed_args)


def tquery(query, con=None, cur=None):
    """Replace removed sql.tquery function"""
    res = sql.execute(query, con=con, cur=cur).fetchall()
    if res is None:
        return None
    else:
        return list(res)


@pytest.mark.single
class TestXSQLite(SQLiteMixIn):
    @pytest.fixture(autouse=True)
    def setup_method(self, request, datapath):
        self.method = request.function
        self.conn = sqlite3.connect(":memory:")

        # In some test cases we may close db connection
        # Re-open conn here so we can perform cleanup in teardown
        yield
        self.method = request.function
        self.conn = sqlite3.connect(":memory:")

    def test_basic(self):
        frame = tm.makeTimeDataFrame()
        self._check_roundtrip(frame)

    def test_write_row_by_row(self):

        frame = tm.makeTimeDataFrame()
        frame.iloc[0, 0] = np.nan
        create_sql = sql.get_schema(frame, "test")
        cur = self.conn.cursor()
        cur.execute(create_sql)

        cur = self.conn.cursor()

        ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
        for idx, row in frame.iterrows():
            fmt_sql = format_query(ins, *row)
            tquery(fmt_sql, cur=cur)

        self.conn.commit()

        result = sql.read_sql("select * from test", con=self.conn)
        result.index = frame.index
        tm.assert_frame_equal(result, frame, check_less_precise=True)

    def test_execute(self):
        frame = tm.makeTimeDataFrame()
        create_sql = sql.get_schema(frame, "test")
        cur = self.conn.cursor()
        cur.execute(create_sql)
        ins = "INSERT INTO test VALUES (?, ?, ?, ?)"

        row = frame.iloc[0]
        sql.execute(ins, self.conn, params=tuple(row))
        self.conn.commit()

        result = sql.read_sql("select * from test", self.conn)
        result.index = frame.index[:1]
        tm.assert_frame_equal(result, frame[:1])

    def test_schema(self):
        frame = tm.makeTimeDataFrame()
        create_sql = sql.get_schema(frame, "test")
        lines = create_sql.splitlines()
        for l in lines:
            tokens = l.split(" ")
            if len(tokens) == 2 and tokens[0] == "A":
                assert tokens[1] == "DATETIME"

        frame = tm.makeTimeDataFrame()
        create_sql = sql.get_schema(frame, "test", keys=["A", "B"])
        lines = create_sql.splitlines()
        assert 'PRIMARY KEY ("A", "B")' in create_sql
        cur = self.conn.cursor()
        cur.execute(create_sql)

    def test_execute_fail(self):
        create_sql = """
        CREATE TABLE test
        (
        a TEXT,
        b TEXT,
        c REAL,
        PRIMARY KEY (a, b)
        );
        """
        cur = self.conn.cursor()
        cur.execute(create_sql)

        sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
        sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn)

        with pytest.raises(Exception):
            sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)

    def test_execute_closed_connection(self):
        create_sql = """
        CREATE TABLE test
        (
        a TEXT,
        b TEXT,
        c REAL,
        PRIMARY KEY (a, b)
        );
        """
        cur = self.conn.cursor()
        cur.execute(create_sql)

        sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
        self.conn.close()

        with pytest.raises(Exception):
            tquery("select * from test", con=self.conn)

    def test_na_roundtrip(self):
        pass

    def _check_roundtrip(self, frame):
        sql.to_sql(frame, name="test_table", con=self.conn, index=False)
        result = sql.read_sql("select * from test_table", self.conn)

        # HACK! Change this once indexes are handled properly.
        result.index = frame.index

        expected = frame
        tm.assert_frame_equal(result, expected)

        frame["txt"] = ["a"] * len(frame)
        frame2 = frame.copy()
        new_idx = Index(np.arange(len(frame2))) + 10
        frame2["Idx"] = new_idx.copy()
        sql.to_sql(frame2, name="test_table2", con=self.conn, index=False)
        result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx")
        expected = frame.copy()
        expected.index = new_idx
        expected.index.name = "Idx"
        tm.assert_frame_equal(expected, result)

    def test_keyword_as_column_names(self):
        df = DataFrame({"From": np.ones(5)})
        sql.to_sql(df, con=self.conn, name="testkeywords", index=False)

    def test_onecolumn_of_integer(self):
        # GH 3628
        # a column_of_integers dataframe should transfer well to sql

        mono_df = DataFrame([1, 2], columns=["c0"])
        sql.to_sql(mono_df, con=self.conn, name="mono_df", index=False)
        # computing the sum via sql
        con_x = self.conn
        the_sum = sum(my_c0[0] for my_c0 in con_x.execute("select * from mono_df"))
        # it should not fail, and gives 3 ( Issue #3628 )
        assert the_sum == 3

        result = sql.read_sql("select * from mono_df", con_x)
        tm.assert_frame_equal(result, mono_df)

    def test_if_exists(self):
        df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
        df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]})
        table_name = "table_if_exists"
        sql_select = "SELECT * FROM {table_name}".format(table_name=table_name)

        def clean_up(test_table_to_drop):
            """
            Drops tables created from individual tests
            so no dependencies arise from sequential tests
            """
            self.drop_table(test_table_to_drop)

        msg = "'notvalidvalue' is not valid for if_exists"
        with pytest.raises(ValueError, match=msg):
            sql.to_sql(
                frame=df_if_exists_1,
                con=self.conn,
                name=table_name,
                if_exists="notvalidvalue",
            )
        clean_up(table_name)

        # test if_exists='fail'
        sql.to_sql(
            frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail"
        )
        msg = "Table 'table_if_exists' already exists"
        with pytest.raises(ValueError, match=msg):
            sql.to_sql(
                frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail"
            )
        # test if_exists='replace'
        sql.to_sql(
            frame=df_if_exists_1,
            con=self.conn,
            name=table_name,
            if_exists="replace",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
        sql.to_sql(
            frame=df_if_exists_2,
            con=self.conn,
            name=table_name,
            if_exists="replace",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")]
        clean_up(table_name)

        # test if_exists='append'
        sql.to_sql(
            frame=df_if_exists_1,
            con=self.conn,
            name=table_name,
            if_exists="fail",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
        sql.to_sql(
            frame=df_if_exists_2,
            con=self.conn,
            name=table_name,
            if_exists="append",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [
            (1, "A"),
            (2, "B"),
            (3, "C"),
            (4, "D"),
            (5, "E"),
        ]
        clean_up(table_name)


@pytest.mark.single
@pytest.mark.db
@pytest.mark.skip(
    reason="gh-13611: there is no support for MySQL if SQLAlchemy is not installed"
)
class TestXMySQL(MySQLMixIn):
    @pytest.fixture(autouse=True, scope="class")
    def setup_class(cls):
        pymysql = pytest.importorskip("pymysql")
        pymysql.connect(host="localhost", user="root", passwd="", db="pandas_nosetest")
        try:
            pymysql.connect(read_default_group="pandas")
        except pymysql.ProgrammingError:
            raise RuntimeError(
                "Create a group of connection parameters under the heading "
                "[pandas] in your system's mysql default file, "
                "typically located at ~/.my.cnf or /etc/.my.cnf."
            )
        except pymysql.Error:
            raise RuntimeError(
                "Cannot connect to database. "
                "Create a group of connection parameters under the heading "
                "[pandas] in your system's mysql default file, "
                "typically located at ~/.my.cnf or /etc/.my.cnf."
            )

    @pytest.fixture(autouse=True)
    def setup_method(self, request, datapath):
        pymysql = pytest.importorskip("pymysql")
        pymysql.connect(host="localhost", user="root", passwd="", db="pandas_nosetest")
        try:
            pymysql.connect(read_default_group="pandas")
        except pymysql.ProgrammingError:
            raise RuntimeError(
                "Create a group of connection parameters under the heading "
                "[pandas] in your system's mysql default file, "
                "typically located at ~/.my.cnf or /etc/.my.cnf."
            )
        except pymysql.Error:
            raise RuntimeError(
                "Cannot connect to database. "
                "Create a group of connection parameters under the heading "
                "[pandas] in your system's mysql default file, "
                "typically located at ~/.my.cnf or /etc/.my.cnf."
            )

        self.method = request.function

    def test_basic(self):
        frame = tm.makeTimeDataFrame()
        self._check_roundtrip(frame)

    def test_write_row_by_row(self):
        frame = tm.makeTimeDataFrame()
        frame.iloc[0, 0] = np.nan
        drop_sql = "DROP TABLE IF EXISTS test"
        create_sql = sql.get_schema(frame, "test")
        cur = self.conn.cursor()
        cur.execute(drop_sql)
        cur.execute(create_sql)
        ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"
        for idx, row in frame.iterrows():
            fmt_sql = format_query(ins, *row)
            tquery(fmt_sql, cur=cur)

        self.conn.commit()

        result = sql.read_sql("select * from test", con=self.conn)
        result.index = frame.index
        tm.assert_frame_equal(result, frame, check_less_precise=True)

    def test_chunksize_read_type(self):
        frame = tm.makeTimeDataFrame()
        frame.index.name = "index"
        drop_sql = "DROP TABLE IF EXISTS test"
        cur = self.conn.cursor()
        cur.execute(drop_sql)
        sql.to_sql(frame, name="test", con=self.conn)
        query = "select * from test"
        chunksize = 5
        chunk_gen = pd.read_sql_query(
            sql=query, con=self.conn, chunksize=chunksize, index_col="index"
        )
        chunk_df = next(chunk_gen)
        tm.assert_frame_equal(frame[:chunksize], chunk_df)

    def test_execute(self):
        frame = tm.makeTimeDataFrame()
        drop_sql = "DROP TABLE IF EXISTS test"
        create_sql = sql.get_schema(frame, "test")
        cur = self.conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "Unknown table.*")
            cur.execute(drop_sql)
        cur.execute(create_sql)
        ins = "INSERT INTO test VALUES (%s, %s, %s, %s)"

        row = frame.iloc[0].values.tolist()
        sql.execute(ins, self.conn, params=tuple(row))
        self.conn.commit()

        result = sql.read_sql("select * from test", self.conn)
        result.index = frame.index[:1]
        tm.assert_frame_equal(result, frame[:1])

    def test_schema(self):
        frame = tm.makeTimeDataFrame()
        create_sql = sql.get_schema(frame, "test")
        lines = create_sql.splitlines()
        for l in lines:
            tokens = l.split(" ")
            if len(tokens) == 2 and tokens[0] == "A":
                assert tokens[1] == "DATETIME"

        frame = tm.makeTimeDataFrame()
        drop_sql = "DROP TABLE IF EXISTS test"
        create_sql = sql.get_schema(frame, "test", keys=["A", "B"])
        lines = create_sql.splitlines()
        assert "PRIMARY KEY (`A`, `B`)" in create_sql
        cur = self.conn.cursor()
        cur.execute(drop_sql)
        cur.execute(create_sql)

    def test_execute_fail(self):
        drop_sql = "DROP TABLE IF EXISTS test"
        create_sql = """
        CREATE TABLE test
        (
        a TEXT,
        b TEXT,
        c REAL,
        PRIMARY KEY (a(5), b(5))
        );
        """
        cur = self.conn.cursor()
        cur.execute(drop_sql)
        cur.execute(create_sql)

        sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
        sql.execute('INSERT INTO test VALUES("foo", "baz", 2.567)', self.conn)

        with pytest.raises(Exception):
            sql.execute('INSERT INTO test VALUES("foo", "bar", 7)', self.conn)

    def test_execute_closed_connection(self, request, datapath):
        drop_sql = "DROP TABLE IF EXISTS test"
        create_sql = """
        CREATE TABLE test
        (
        a TEXT,
        b TEXT,
        c REAL,
        PRIMARY KEY (a(5), b(5))
        );
        """
        cur = self.conn.cursor()
        cur.execute(drop_sql)
        cur.execute(create_sql)

        sql.execute('INSERT INTO test VALUES("foo", "bar", 1.234)', self.conn)
        self.conn.close()

        with pytest.raises(Exception):
            tquery("select * from test", con=self.conn)

        # Initialize connection again (needed for tearDown)
        self.setup_method(request, datapath)

    def test_na_roundtrip(self):
        pass

    def _check_roundtrip(self, frame):
        drop_sql = "DROP TABLE IF EXISTS test_table"
        cur = self.conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "Unknown table.*")
            cur.execute(drop_sql)
        sql.to_sql(frame, name="test_table", con=self.conn, index=False)
        result = sql.read_sql("select * from test_table", self.conn)

        # HACK! Change this once indexes are handled properly.
        result.index = frame.index
        result.index.name = frame.index.name

        expected = frame
        tm.assert_frame_equal(result, expected)

        frame["txt"] = ["a"] * len(frame)
        frame2 = frame.copy()
        index = Index(np.arange(len(frame2))) + 10
        frame2["Idx"] = index
        drop_sql = "DROP TABLE IF EXISTS test_table2"
        cur = self.conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "Unknown table.*")
            cur.execute(drop_sql)
        sql.to_sql(frame2, name="test_table2", con=self.conn, index=False)
        result = sql.read_sql("select * from test_table2", self.conn, index_col="Idx")
        expected = frame.copy()

        # HACK! Change this once indexes are handled properly.
        expected.index = index
        expected.index.names = result.index.names
        tm.assert_frame_equal(expected, result)

    def test_keyword_as_column_names(self):
        df = DataFrame({"From": np.ones(5)})
        sql.to_sql(
            df, con=self.conn, name="testkeywords", if_exists="replace", index=False
        )

    def test_if_exists(self):
        df_if_exists_1 = DataFrame({"col1": [1, 2], "col2": ["A", "B"]})
        df_if_exists_2 = DataFrame({"col1": [3, 4, 5], "col2": ["C", "D", "E"]})
        table_name = "table_if_exists"
        sql_select = "SELECT * FROM {table_name}".format(table_name=table_name)

        def clean_up(test_table_to_drop):
            """
            Drops tables created from individual tests
            so no dependencies arise from sequential tests
            """
            self.drop_table(test_table_to_drop)

        # test if invalid value for if_exists raises appropriate error
        with pytest.raises(ValueError, match="<insert message here>"):
            sql.to_sql(
                frame=df_if_exists_1,
                con=self.conn,
                name=table_name,
                if_exists="notvalidvalue",
            )
        clean_up(table_name)

        # test if_exists='fail'
        sql.to_sql(
            frame=df_if_exists_1,
            con=self.conn,
            name=table_name,
            if_exists="fail",
            index=False,
        )
        with pytest.raises(ValueError, match="<insert message here>"):
            sql.to_sql(
                frame=df_if_exists_1, con=self.conn, name=table_name, if_exists="fail"
            )

        # test if_exists='replace'
        sql.to_sql(
            frame=df_if_exists_1,
            con=self.conn,
            name=table_name,
            if_exists="replace",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
        sql.to_sql(
            frame=df_if_exists_2,
            con=self.conn,
            name=table_name,
            if_exists="replace",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(3, "C"), (4, "D"), (5, "E")]
        clean_up(table_name)

        # test if_exists='append'
        sql.to_sql(
            frame=df_if_exists_1,
            con=self.conn,
            name=table_name,
            if_exists="fail",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [(1, "A"), (2, "B")]
        sql.to_sql(
            frame=df_if_exists_2,
            con=self.conn,
            name=table_name,
            if_exists="append",
            index=False,
        )
        assert tquery(sql_select, con=self.conn) == [
            (1, "A"),
            (2, "B"),
            (3, "C"),
            (4, "D"),
            (5, "E"),
        ]
        clean_up(table_name)