# -*- coding: utf-8 -*-
"""
Collection of query wrappers / abstractions to both facilitate data
retrieval and to reduce dependency on DB-specific API.
"""
from __future__ import division, print_function
from contextlib import contextmanager
from datetime import date, datetime, time
from functools import partial
import re
import warnings
import numpy as np
import pandas._libs.lib as lib
from pandas.compat import (
map, raise_with_traceback, string_types, text_type, zip)
from pandas.core.dtypes.common import (
is_datetime64tz_dtype, is_dict_like, is_list_like)
from pandas.core.dtypes.dtypes import DatetimeTZDtype
from pandas.core.dtypes.missing import isna
from pandas.core.api import DataFrame, Series
from pandas.core.base import PandasObject
from pandas.core.tools.datetimes import to_datetime
class SQLAlchemyRequired(ImportError):
pass
class DatabaseError(IOError):
pass
# -----------------------------------------------------------------------------
# -- Helper functions
_SQLALCHEMY_INSTALLED = None
def _is_sqlalchemy_connectable(con):
global _SQLALCHEMY_INSTALLED
if _SQLALCHEMY_INSTALLED is None:
try:
import sqlalchemy
_SQLALCHEMY_INSTALLED = True
from distutils.version import LooseVersion
ver = sqlalchemy.__version__
# For sqlalchemy versions < 0.8.2, the BIGINT type is recognized
# for a sqlite engine, which results in a warning when trying to
# read/write a DataFrame with int64 values. (GH7433)
if LooseVersion(ver) < LooseVersion('0.8.2'):
from sqlalchemy import BigInteger
from sqlalchemy.ext.compiler import compiles
@compiles(BigInteger, 'sqlite')
def compile_big_int_sqlite(type_, compiler, **kw):
return 'INTEGER'
except ImportError:
_SQLALCHEMY_INSTALLED = False
if _SQLALCHEMY_INSTALLED:
import sqlalchemy
return isinstance(con, sqlalchemy.engine.Connectable)
else:
return False
def _convert_params(sql, params):
"""Convert SQL and params args to DBAPI2.0 compliant format."""
args = [sql]
if params is not None:
if hasattr(params, 'keys'): # test if params is a mapping
args += [params]
else:
args += [list(params)]
return args
def _process_parse_dates_argument(parse_dates):
"""Process parse_dates argument for read_sql functions"""
# handle non-list entries for parse_dates gracefully
if parse_dates is True or parse_dates is None or parse_dates is False:
parse_dates = []
elif not hasattr(parse_dates, '__iter__'):
parse_dates = [parse_dates]
return parse_dates
def _handle_date_column(col, utc=None, format=None):
if isinstance(format, dict):
return to_datetime(col, errors='ignore', **format)
else:
# Allow passing of formatting string for integers
# GH17855
if format is None and (issubclass(col.dtype.type, np.floating) or
issubclass(col.dtype.type, np.integer)):
format = 's'
if format in ['D', 'd', 'h', 'm', 's', 'ms', 'us', 'ns']:
return to_datetime(col, errors='coerce', unit=format, utc=utc)
elif is_datetime64tz_dtype(col):
# coerce to UTC timezone
# GH11216
return to_datetime(col, utc=True)
else:
return to_datetime(col, errors='coerce', format=format, utc=utc)
def _parse_date_columns(data_frame, parse_dates):
"""
Force non-datetime columns to be read as such.
Supports both string formatted and integer timestamp columns.
"""
parse_dates = _process_parse_dates_argument(parse_dates)
# we want to coerce datetime64_tz dtypes for now to UTC
# we could in theory do a 'nice' conversion from a FixedOffset tz
# GH11216
for col_name, df_col in data_frame.iteritems():
if is_datetime64tz_dtype(df_col) or col_name in parse_dates:
try:
fmt = parse_dates[col_name]
except TypeError:
fmt = None
data_frame[col_name] = _handle_date_column(df_col, format=fmt)
return data_frame
def _wrap_result(data, columns, index_col=None, coerce_float=True,
parse_dates=None):
"""Wrap result set of query in a DataFrame."""
frame = DataFrame.from_records(data, columns=columns,
coerce_float=coerce_float)
frame = _parse_date_columns(frame, parse_dates)
if index_col is not None:
frame.set_index(index_col, inplace=True)
return frame
def execute(sql, con, cur=None, params=None):
"""
Execute the given SQL query using the provided connection object.
Parameters
----------
sql : string
SQL query to be executed.
con : SQLAlchemy connectable(engine/connection) or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by the
library.
If a DBAPI2 object, only sqlite3 is supported.
cur : deprecated, cursor is obtained from connection, default: None
params : list or tuple, optional, default: None
List of parameters to pass to execute method.
Returns
-------
Results Iterable
"""
if cur is None:
pandas_sql = pandasSQL_builder(con)
else:
pandas_sql = pandasSQL_builder(cur, is_cursor=True)
args = _convert_params(sql, params)
return pandas_sql.execute(*args)
# -----------------------------------------------------------------------------
# -- Read and write to DataFrames
def read_sql_table(table_name, con, schema=None, index_col=None,
coerce_float=True, parse_dates=None, columns=None,
chunksize=None):
"""Read SQL database table into a DataFrame.
Given a table name and a SQLAlchemy connectable, returns a DataFrame.
This function does not support DBAPI connections.
Parameters
----------
table_name : string
Name of SQL table in database.
con : SQLAlchemy connectable (or database string URI)
SQLite DBAPI connection mode not supported.
schema : string, default None
Name of SQL schema in database to query (if database flavor
supports this). Uses default schema if None (default).
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : boolean, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point. Can result in loss of Precision.
parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite.
columns : list, default: None
List of column names to select from SQL table
chunksize : int, default None
If specified, returns an iterator where `chunksize` is the number of
rows to include in each chunk.
Returns
-------
DataFrame
See Also
--------
read_sql_query : Read SQL query into a DataFrame.
read_sql
Notes
-----
Any datetime values with time zone information will be converted to UTC.
"""
con = _engine_builder(con)
if not _is_sqlalchemy_connectable(con):
raise NotImplementedError("read_sql_table only supported for "
"SQLAlchemy connectable.")
import sqlalchemy
from sqlalchemy.schema import MetaData
meta = MetaData(con, schema=schema)
try:
meta.reflect(only=[table_name], views=True)
except sqlalchemy.exc.InvalidRequestError:
raise ValueError("Table {name} not found".format(name=table_name))
pandas_sql = SQLDatabase(con, meta=meta)
table = pandas_sql.read_table(
table_name, index_col=index_col, coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns, chunksize=chunksize)
if table is not None:
return table
else:
raise ValueError("Table {name} not found".format(name=table_name), con)
def read_sql_query(sql, con, index_col=None, coerce_float=True, params=None,
parse_dates=None, chunksize=None):
"""Read SQL query into a DataFrame.
Returns a DataFrame corresponding to the result set of the query
string. Optionally provide an `index_col` parameter to use one of the
columns as the index, otherwise default integer index will be used.
Parameters
----------
sql : string SQL query or SQLAlchemy Selectable (select or text object)
SQL query to be executed.
con : SQLAlchemy connectable(engine/connection), database string URI,
or sqlite3 DBAPI2 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library.
If a DBAPI2 object, only sqlite3 is supported.
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : boolean, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point. Useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used
to pass parameters is database driver dependent. Check your
database driver documentation for which of the five syntax styles,
described in PEP 249's paramstyle, is supported.
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times, or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps.
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite.
chunksize : int, default None
If specified, return an iterator where `chunksize` is the number of
rows to include in each chunk.
Returns
-------
DataFrame
See Also
--------
read_sql_table : Read SQL database table into a DataFrame.
read_sql
Notes
-----
Any datetime values with time zone information parsed via the `parse_dates`
parameter will be converted to UTC.
"""
pandas_sql = pandasSQL_builder(con)
return pandas_sql.read_query(
sql, index_col=index_col, params=params, coerce_float=coerce_float,
parse_dates=parse_dates, chunksize=chunksize)
def read_sql(sql, con, index_col=None, coerce_float=True, params=None,
parse_dates=None, columns=None, chunksize=None):
"""
Read SQL query or database table into a DataFrame.
This function is a convenience wrapper around ``read_sql_table`` and
``read_sql_query`` (for backward compatibility). It will delegate
to the specific function depending on the provided input. A SQL query
will be routed to ``read_sql_query``, while a database table name will
be routed to ``read_sql_table``. Note that the delegated function might
have more specific notes about their functionality not listed here.
Parameters
----------
sql : string or SQLAlchemy Selectable (select or text object)
SQL query to be executed or a table name.
con : SQLAlchemy connectable (engine/connection) or database string URI
or DBAPI2 connection (fallback mode)
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported.
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : boolean, default True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point, useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used
to pass parameters is database driver dependent. Check your
Loading ...