from datetime import datetime
import os
import platform
import numpy as np
import pytest
import pytz
import pandas as pd
from pandas import DataFrame
api_exceptions = pytest.importorskip("google.api_core.exceptions")
bigquery = pytest.importorskip("google.cloud.bigquery")
service_account = pytest.importorskip("google.oauth2.service_account")
pandas_gbq = pytest.importorskip("pandas_gbq")
PROJECT_ID = None
PRIVATE_KEY_JSON_PATH = None
PRIVATE_KEY_JSON_CONTENTS = None
DATASET_ID = "pydata_pandas_bq_testing_py3"
TABLE_ID = "new_test"
DESTINATION_TABLE = "{0}.{1}".format(DATASET_ID + "1", TABLE_ID)
VERSION = platform.python_version()
def _skip_if_no_project_id():
if not _get_project_id():
pytest.skip("Cannot run integration tests without a project id")
def _skip_if_no_private_key_path():
if not _get_private_key_path():
pytest.skip("Cannot run integration tests without a private key json file path")
def _in_travis_environment():
return "TRAVIS_BUILD_DIR" in os.environ and "GBQ_PROJECT_ID" in os.environ
def _get_project_id():
if _in_travis_environment():
return os.environ.get("GBQ_PROJECT_ID")
return PROJECT_ID or os.environ.get("GBQ_PROJECT_ID")
def _get_private_key_path():
if _in_travis_environment():
return os.path.join(
*[os.environ.get("TRAVIS_BUILD_DIR"), "ci", "travis_gbq.json"]
)
private_key_path = PRIVATE_KEY_JSON_PATH
if not private_key_path:
private_key_path = os.environ.get("GBQ_GOOGLE_APPLICATION_CREDENTIALS")
return private_key_path
def _get_credentials():
private_key_path = _get_private_key_path()
if private_key_path:
return service_account.Credentials.from_service_account_file(private_key_path)
def _get_client():
project_id = _get_project_id()
credentials = _get_credentials()
return bigquery.Client(project=project_id, credentials=credentials)
def make_mixed_dataframe_v2(test_size):
# create df to test for all BQ datatypes except RECORD
bools = np.random.randint(2, size=(1, test_size)).astype(bool)
flts = np.random.randn(1, test_size)
ints = np.random.randint(1, 10, size=(1, test_size))
strs = np.random.randint(1, 10, size=(1, test_size)).astype(str)
times = [datetime.now(pytz.timezone("US/Arizona")) for t in range(test_size)]
return DataFrame(
{
"bools": bools[0],
"flts": flts[0],
"ints": ints[0],
"strs": strs[0],
"times": times[0],
},
index=range(test_size),
)
def test_read_gbq_with_deprecated_kwargs(monkeypatch):
captured_kwargs = {}
def mock_read_gbq(sql, **kwargs):
captured_kwargs.update(kwargs)
return DataFrame([[1.0]])
monkeypatch.setattr("pandas_gbq.read_gbq", mock_read_gbq)
private_key = object()
pd.read_gbq("SELECT 1", verbose=True, private_key=private_key)
assert captured_kwargs["verbose"]
assert captured_kwargs["private_key"] is private_key
def test_read_gbq_without_deprecated_kwargs(monkeypatch):
captured_kwargs = {}
def mock_read_gbq(sql, **kwargs):
captured_kwargs.update(kwargs)
return DataFrame([[1.0]])
monkeypatch.setattr("pandas_gbq.read_gbq", mock_read_gbq)
pd.read_gbq("SELECT 1")
assert "verbose" not in captured_kwargs
assert "private_key" not in captured_kwargs
def test_read_gbq_with_new_kwargs(monkeypatch):
captured_kwargs = {}
def mock_read_gbq(sql, **kwargs):
captured_kwargs.update(kwargs)
return DataFrame([[1.0]])
monkeypatch.setattr("pandas_gbq.read_gbq", mock_read_gbq)
pd.read_gbq("SELECT 1", use_bqstorage_api=True)
assert captured_kwargs["use_bqstorage_api"]
def test_read_gbq_without_new_kwargs(monkeypatch):
captured_kwargs = {}
def mock_read_gbq(sql, **kwargs):
captured_kwargs.update(kwargs)
return DataFrame([[1.0]])
monkeypatch.setattr("pandas_gbq.read_gbq", mock_read_gbq)
pd.read_gbq("SELECT 1")
assert "use_bqstorage_api" not in captured_kwargs
@pytest.mark.single
class TestToGBQIntegrationWithServiceAccountKeyPath:
@classmethod
def setup_class(cls):
# - GLOBAL CLASS FIXTURES -
# put here any instruction you want to execute only *ONCE* *BEFORE*
# executing *ALL* tests described below.
_skip_if_no_project_id()
_skip_if_no_private_key_path()
cls.client = _get_client()
cls.dataset = cls.client.dataset(DATASET_ID + "1")
try:
# Clean-up previous test runs.
cls.client.delete_dataset(cls.dataset, delete_contents=True)
except api_exceptions.NotFound:
pass # It's OK if the dataset doesn't already exist.
cls.client.create_dataset(bigquery.Dataset(cls.dataset))
@classmethod
def teardown_class(cls):
# - GLOBAL CLASS FIXTURES -
# put here any instruction you want to execute only *ONCE* *AFTER*
# executing all tests.
cls.client.delete_dataset(cls.dataset, delete_contents=True)
def test_roundtrip(self):
destination_table = DESTINATION_TABLE + "1"
test_size = 20001
df = make_mixed_dataframe_v2(test_size)
df.to_gbq(
destination_table,
_get_project_id(),
chunksize=None,
credentials=_get_credentials(),
)
result = pd.read_gbq(
"SELECT COUNT(*) AS num_rows FROM {0}".format(destination_table),
project_id=_get_project_id(),
credentials=_get_credentials(),
dialect="standard",
)
assert result["num_rows"][0] == test_size