Repository URL to install this package:
|
Version:
1.12.0 ▾
|
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import datetime
import string
from pathlib import Path
from typing import Generator
import numpy
import pyarrow
import pyarrow.dataset
import pytest
from adbc_driver_postgresql import ConnectionOptions, StatementOptions, dbapi
@pytest.fixture
def postgres(postgres_uri: str) -> Generator[dbapi.Connection, None, None]:
with dbapi.connect(postgres_uri) as conn:
yield conn
def test_conn_current_catalog(postgres: dbapi.Connection) -> None:
assert postgres.adbc_current_catalog != ""
def test_conn_current_db_schema(postgres: dbapi.Connection) -> None:
assert postgres.adbc_current_db_schema == "public"
def test_conn_change_db_schema(postgres: dbapi.Connection) -> None:
assert postgres.adbc_current_db_schema == "public"
with postgres.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS dbapischema")
assert postgres.adbc_current_db_schema == "public"
postgres.adbc_current_db_schema = "dbapischema"
assert postgres.adbc_current_db_schema == "dbapischema"
def test_get_objects_schema_filter_outside_search_path(
postgres: dbapi.Connection,
) -> None:
schema_name = "dbapi_get_objects_test"
table_name = "schema_filter_table"
# Regression test: adbc_get_objects(db_schema_filter=...) should not depend on the
# connection's current schema/search_path.
assert postgres.adbc_current_db_schema == "public"
with postgres.cursor() as cur:
cur.execute(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"')
cur.execute(f'DROP TABLE IF EXISTS "{schema_name}"."{table_name}"')
cur.execute(f'CREATE TABLE "{schema_name}"."{table_name}" (ints INT)')
postgres.commit()
assert postgres.adbc_current_db_schema == "public"
metadata = (
postgres.adbc_get_objects(
depth="tables",
db_schema_filter=schema_name,
table_name_filter=table_name,
)
.read_all()
.to_pylist()
)
catalog_name = postgres.adbc_current_catalog
catalog = next(
(row for row in metadata if row["catalog_name"] == catalog_name), None
)
assert catalog is not None
schemas = catalog["catalog_db_schemas"]
assert len(schemas) == 1
assert schemas[0]["db_schema_name"] == schema_name
tables = schemas[0]["db_schema_tables"]
assert len(tables) == 1
assert tables[0]["table_name"] == table_name
def test_conn_get_info(postgres: dbapi.Connection) -> None:
info = postgres.adbc_get_info()
assert info["driver_name"] == "ADBC PostgreSQL Driver"
assert info["driver_adbc_version"] == 1_001_000
assert info["vendor_name"] == "PostgreSQL"
def test_get_statistics(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_statistics")
cur.execute("CREATE TABLE test_statistics (id INT PRIMARY KEY, value TEXT)")
cur.execute("INSERT INTO test_statistics VALUES (1, 'a'), (2, 'b'), (3, 'c')")
cur.execute("ANALYZE test_statistics")
postgres.commit()
# PostgreSQL requires db_schema to be specified and only supports approximate stats
reader = postgres.adbc_get_statistics(
db_schema_filter="public", table_name_filter="test_statistics", approximate=True
)
assert reader is not None
table = reader.read_all()
# Verify schema is correct
assert "catalog_name" in table.schema.names
assert "catalog_db_schemas" in table.schema.names
# Verify we got actual statistics for our table
result_list = table.to_pylist()
found_test_table = False
for catalog in result_list:
for schema in catalog["catalog_db_schemas"]:
assert schema["db_schema_name"] == "public"
found_test_table = found_test_table or any(
stat["table_name"] == "test_statistics"
for stat in schema["db_schema_statistics"]
)
assert found_test_table, "Expected statistics for 'test_statistics'"
def test_get_statistic_names(postgres: dbapi.Connection) -> None:
reader = postgres.adbc_get_statistic_names()
assert reader is not None
table = reader.read_all()
# Verify schema
assert "statistic_name" in table.schema.names
assert "statistic_key" in table.schema.names
def test_query_batch_size(postgres: dbapi.Connection):
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_batch_size")
cur.execute("CREATE TABLE test_batch_size (ints INT)")
cur.execute(
"""
INSERT INTO test_batch_size (ints)
SELECT generated :: INT
FROM GENERATE_SERIES(1, 65536) temp(generated)
"""
)
cur.execute("SELECT * FROM test_batch_size")
table = cur.fetch_arrow_table()
assert len(table.to_batches()) == 1
cur.adbc_statement.set_options(
**{StatementOptions.BATCH_SIZE_HINT_BYTES.value: "1"}
)
assert (
cur.adbc_statement.get_option_int(
StatementOptions.BATCH_SIZE_HINT_BYTES.value
)
== 1
)
cur.execute("SELECT * FROM test_batch_size")
table = cur.fetch_arrow_table()
assert len(table.to_batches()) == 65536
cur.adbc_statement.set_options(
**{StatementOptions.BATCH_SIZE_HINT_BYTES.value: "4096"}
)
assert (
cur.adbc_statement.get_option_int(
StatementOptions.BATCH_SIZE_HINT_BYTES.value
)
== 4096
)
cur.execute("SELECT * FROM test_batch_size")
table = cur.fetch_arrow_table()
assert 64 <= len(table.to_batches()) <= 256
def test_query_cancel(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_batch_size")
cur.execute("CREATE TABLE test_batch_size (ints INT)")
cur.execute(
"""
INSERT INTO test_batch_size (ints)
SELECT generated :: INT
FROM GENERATE_SERIES(1, 1048576) temp(generated)
"""
)
postgres.commit()
# Ensure different ways of reading all raise the desired error
with postgres.cursor() as cur:
cur.execute("SELECT * FROM test_batch_size")
cur.adbc_cancel()
with pytest.raises(postgres.OperationalError, match="canceling statement"):
cur.fetchone()
postgres.rollback()
with postgres.cursor() as cur:
cur.execute("SELECT * FROM test_batch_size")
cur.adbc_cancel()
with pytest.raises(postgres.OperationalError, match="canceling statement"):
cur.fetch_arrow_table()
postgres.rollback()
with postgres.cursor() as cur:
cur.execute("SELECT * FROM test_batch_size")
cur.adbc_cancel()
with pytest.raises(postgres.OperationalError, match="canceling statement"):
cur.fetch_df()
def test_query_execute_schema(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
schema = cur.adbc_execute_schema("SELECT 1 AS foo")
assert schema == pyarrow.schema([("foo", "int32")])
def test_query_invalid(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
with pytest.raises(
postgres.ProgrammingError, match="Failed to prepare query"
) as excinfo:
cur.execute("SELECT * FROM tabledoesnotexist")
assert excinfo.value.sqlstate == "42P01"
assert len(excinfo.value.details) > 0
def test_query_trivial(postgres: dbapi.Connection):
with postgres.cursor() as cur:
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
def test_stmt_ingest(postgres: dbapi.Connection) -> None:
table = pyarrow.table(
[
[1, 2, 3],
["a", None, "b"],
],
names=["ints", "strs"],
)
double_table = pyarrow.table(
[
[1, 1, 2, 2, 3, 3],
["a", "a", None, None, "b", "b"],
],
names=["ints", "strs"],
)
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_ingest")
with pytest.raises(
postgres.ProgrammingError, match='"public.test_ingest" does not exist'
):
cur.adbc_ingest("test_ingest", table, mode="append")
postgres.rollback()
cur.adbc_ingest("test_ingest", table, mode="replace")
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
postgres.commit()
with pytest.raises(
postgres.ProgrammingError, match='"test_ingest" already exists'
):
cur.adbc_ingest("test_ingest", table, mode="create")
postgres.rollback()
cur.adbc_ingest("test_ingest", table, mode="create_append")
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == double_table
cur.adbc_ingest("test_ingest", table, mode="replace")
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
cur.execute("DROP TABLE IF EXISTS test_ingest")
cur.adbc_ingest("test_ingest", table, mode="create_append")
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
cur.execute("DROP TABLE IF EXISTS test_ingest")
cur.adbc_ingest("test_ingest", table, mode="create")
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
def test_stmt_ingest_dataset(postgres: dbapi.Connection, tmp_path: Path) -> None:
# Regression test for https://github.com/apache/arrow-adbc/issues/1310
table = pyarrow.table(
[
[1, 1, 2, 2, 3, 3],
["a", "a", None, None, "b", "b"],
],
schema=pyarrow.schema([("ints", "int32"), ("strs", "string")]),
)
pyarrow.dataset.write_dataset(
table, tmp_path, format="parquet", partitioning=["ints"]
)
ds = pyarrow.dataset.dataset(tmp_path, format="parquet", partitioning=["ints"])
with postgres.cursor() as cur:
for item in (
lambda: ds,
lambda: ds.scanner(),
lambda: ds.scanner().to_reader(),
lambda: ds.scanner().to_table(),
):
cur.execute("DROP TABLE IF EXISTS test_ingest")
cur.adbc_ingest(
"test_ingest",
item(),
mode="create_append",
)
cur.execute("SELECT ints, strs FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
def test_stmt_ingest_multi(postgres: dbapi.Connection) -> None:
# Regression test for https://github.com/apache/arrow-adbc/issues/1310
table = pyarrow.table(
[
[1, 1, 2, 2, 3, 3],
["a", "a", None, None, "b", "b"],
],
names=["ints", "strs"],
)
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_ingest")
cur.adbc_ingest(
"test_ingest",
table.to_batches(max_chunksize=2),
mode="create_append",
)
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
def test_stmt_ingest_timestamptz(postgres: dbapi.Connection) -> None:
# Regression test for https://github.com/apache/arrow-adbc/issues/2901
table = pyarrow.table(
[
[1],
[datetime.datetime(2023, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc)],
],
names=["ints", "tstz"],
)
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_ingest")
# Make sure we aren't in UTC time zone
cur.execute("SET TIME ZONE 'Asia/Tokyo'")
postgres.commit()
cur.execute("SELECT current_setting('TIMEZONE')")
assert cur.fetchone() == ("Asia/Tokyo",)
cur.adbc_ingest(
"test_ingest",
table,
mode="create",
)
postgres.commit()
cur.execute("SELECT * FROM test_ingest ORDER BY ints")
assert cur.fetch_arrow_table() == table
def test_ddl(postgres: dbapi.Connection):
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_ddl")
assert cur.fetchone() is None
cur.execute("CREATE TABLE test_ddl (ints INT)")
assert cur.fetchone() is None
cur.execute("INSERT INTO test_ddl VALUES (1) RETURNING ints")
assert cur.fetchone() == (1,)
cur.execute("SELECT * FROM test_ddl")
assert cur.fetchone() == (1,)
def test_crash(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
def test_reuse(postgres: dbapi.Connection) -> None:
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_batch_size")
cur.execute("CREATE TABLE test_batch_size (ints INT)")
cur.execute(
"""
INSERT INTO test_batch_size (ints)
SELECT generated :: INT
FROM GENERATE_SERIES(1, 65536) temp(generated)
"""
)
cur.execute("SELECT * FROM test_batch_size ORDER BY ints ASC")
assert cur.fetchone() == (1,)
cur.execute("SELECT 1")
assert cur.fetchone() == (1,)
cur.execute("SELECT 2")
assert cur.fetchone() == (2,)
def test_ingest(postgres: dbapi.Connection) -> None:
table = pyarrow.Table.from_pydict({"numbers": [1, 2], "letters": ["a", "b"]})
with postgres.cursor() as cur:
cur.adbc_ingest("foo", table, mode="replace", db_schema_name="public")
cur.execute("SELECT * FROM public.foo")
assert cur.fetch_arrow_table() == table
with pytest.raises(dbapi.NotSupportedError):
cur.adbc_ingest("foo", table, catalog_name="main")
def test_ingest_schema(postgres: dbapi.Connection) -> None:
table = pyarrow.Table.from_pydict({"numbers": [1, 2], "letters": ["a", "b"]})
with postgres.cursor() as cur:
cur.execute("CREATE SCHEMA IF NOT EXISTS testschema")
cur.execute("DROP TABLE IF EXISTS testschema.foo")
postgres.commit()
cur.adbc_ingest("foo", table, mode="create", db_schema_name="testschema")
cur.execute("SELECT * FROM testschema.foo ORDER BY numbers")
assert cur.fetch_arrow_table() == table
def test_ingest_temporary(postgres: dbapi.Connection) -> None:
table = pyarrow.Table.from_pydict(
{
"numbers": [1, 2],
"letters": ["a", "b"],
}
)
temp = pyarrow.Table.from_pydict(
{
"ints": [3, 4],
"strs": ["c", "d"],
}
)
table2 = pyarrow.Table.from_pydict(
{
"numbers": [1, 2, 1, 2],
"letters": ["a", "b", "a", "b"],
}
)
temp2 = pyarrow.Table.from_pydict(
{
"ints": [3, 4, 3, 4],
"strs": ["c", "d", "c", "d"],
}
)
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS public.temporary")
cur.execute("DROP TABLE IF EXISTS pg_temp.temporary")
cur.adbc_ingest("temporary", table, mode="create")
cur.adbc_ingest("temporary", temp, mode="create", temporary=True)
cur.execute("SELECT * FROM public.temporary")
assert cur.fetch_arrow_table() == table
cur.execute("SELECT * FROM pg_temp.temporary")
assert cur.fetch_arrow_table() == temp
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp
cur.adbc_ingest("temporary", table, mode="append")
cur.adbc_ingest("temporary", temp, mode="append", temporary=True)
cur.execute("SELECT * FROM public.temporary")
assert cur.fetch_arrow_table() == table2
cur.execute("SELECT * FROM pg_temp.temporary")
assert cur.fetch_arrow_table() == temp2
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp2
cur.adbc_ingest("temporary", table, mode="replace")
cur.adbc_ingest("temporary", temp, mode="replace", temporary=True)
cur.execute("SELECT * FROM public.temporary")
assert cur.fetch_arrow_table() == table
cur.execute("SELECT * FROM pg_temp.temporary")
assert cur.fetch_arrow_table() == temp
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp
cur.adbc_ingest("temporary", table, mode="create_append")
cur.adbc_ingest("temporary", temp, mode="create_append", temporary=True)
cur.execute("SELECT * FROM public.temporary")
assert cur.fetch_arrow_table() == table2
cur.execute("SELECT * FROM pg_temp.temporary")
assert cur.fetch_arrow_table() == temp2
cur.execute("SELECT * FROM temporary")
assert cur.fetch_arrow_table() == temp2
def test_ingest_large(postgres: dbapi.Connection) -> None:
"""Regression test for #1921."""
# More than 1 GiB of data in one batch
arr = pyarrow.array(numpy.random.randint(-100, 100, size=4_000_000))
batch = pyarrow.RecordBatch.from_pydict(
{char: arr for char in string.ascii_lowercase}
)
table = pyarrow.Table.from_batches([batch] * 4)
with postgres.cursor() as cur:
cur.adbc_ingest("test_ingest_large", table, mode="replace")
def test_timestamp_txn(postgres: dbapi.Connection) -> None:
"""Regression test for #2410."""
ts = datetime.datetime.now()
ts_with_tz = datetime.datetime.now(tz=datetime.timezone.utc)
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS ts_txn")
cur.execute("CREATE TABLE ts_txn (ts TIMESTAMP WITH TIME ZONE);")
postgres.commit()
with postgres.cursor() as cur:
cur.execute("INSERT INTO ts_txn VALUES ($1)", parameters=[ts])
cur.execute("SELECT pg_current_xact_id_if_assigned()")
assert cur.fetchone() != (None,)
postgres.commit()
with postgres.cursor() as cur:
cur.execute("INSERT INTO ts_txn VALUES ($1)", parameters=[ts_with_tz])
cur.execute("SELECT pg_current_xact_id_if_assigned()")
assert cur.fetchone() != (None,)
postgres.commit()
def test_txn_status(postgres: dbapi.Connection) -> None:
def status() -> str:
return postgres.adbc_connection.get_option(
ConnectionOptions.TRANSACTION_STATUS.value
)
assert status() == "intrans"
postgres.rollback()
assert status() == "intrans"
with postgres.cursor() as cur:
cur.execute("SELECT 1")
assert status() == "active"
postgres.commit()
assert status() == "intrans"
cur.execute("SELECT 1")
assert status() == "active"
postgres.rollback()
assert status() == "intrans"
def test_connect_conn_kwargs_db_schema(postgres_uri: str, postgres: dbapi.Connection):
"""Verify current DB schema can be set via conn_kwargs."""
schema_key = "adbc.connection.db_schema"
schema_name = "dbapi_test_schema_via_option"
with postgres.cursor() as cur:
cur.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
cur.execute(f"CREATE SCHEMA {schema_name}")
postgres.commit()
with dbapi.connect(postgres_uri, conn_kwargs={schema_key: schema_name}) as conn:
option_value = conn.adbc_connection.get_option(schema_key)
assert option_value == schema_name
def test_server_terminates_connection(postgres_uri: str) -> None:
"""Test that driver handles server terminating the connection gracefully.
Reproduces: https://github.com/apache/arrow-adbc/issues/3878
When the server terminates a connection, the driver should return an error
instead of crashing with a segfault.
"""
with dbapi.connect(postgres_uri) as conn1:
with dbapi.connect(postgres_uri) as conn2:
# Get the backend PID of conn2
with conn2.cursor() as cur:
cur.execute("SELECT pg_backend_pid()")
row = cur.fetchone()
assert row is not None
backend_pid = row[0]
conn2.commit()
# simulate a server side termination of a connection2
with conn1.cursor() as cur:
cur.execute(f"SELECT pg_terminate_backend({backend_pid})")
conn1.commit()
# Try to execute a query on the terminated connection
# This should raise an exception, NOT crash with a segfault
with pytest.raises(Exception):
with conn2.cursor() as cur:
cur.execute("SELECT 1")
# Tests for issue #3549: Cannot use null values as bound parameters
def test_bind_null_insert(postgres: dbapi.Connection) -> None:
"""Test INSERT with None parameter (issue #3549)."""
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_null_binding")
cur.execute("CREATE TABLE test_null_binding(a TEXT, b INT)")
# This should not raise an error about mapping Arrow type 'na' to Postgres type
cur.execute("INSERT INTO test_null_binding VALUES ($1, $2)", ("hello", None))
postgres.commit()
with postgres.cursor() as cur:
cur.execute("SELECT a, b FROM test_null_binding")
result = cur.fetchone()
assert result is not None
assert result[0] == "hello"
assert result[1] is None
def test_bind_null_update(postgres: dbapi.Connection) -> None:
"""Test UPDATE with None parameter (issue #3549)."""
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_null_binding")
cur.execute("CREATE TABLE test_null_binding(a TEXT, b INT)")
cur.execute("INSERT INTO test_null_binding VALUES ('hello', 42)")
postgres.commit()
with postgres.cursor() as cur:
# This should not raise an error
cur.execute("UPDATE test_null_binding SET b=$2 WHERE a=$1", ("hello", None))
postgres.commit()
with postgres.cursor() as cur:
cur.execute("SELECT a, b FROM test_null_binding WHERE a='hello'")
result = cur.fetchone()
assert result is not None
assert result[0] == "hello"
assert result[1] is None
def test_executemany_all_nulls(postgres: dbapi.Connection) -> None:
"""Test executemany with all None values (issue #3549)."""
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_null_binding")
cur.execute("CREATE TABLE test_null_binding(a TEXT, b INT)")
postgres.commit()
with postgres.cursor() as cur:
# This is the critical test case from the issue
cur.executemany(
"INSERT INTO test_null_binding VALUES ($1, $2)",
[("hello", None), ("world", None)],
)
postgres.commit()
with postgres.cursor() as cur:
cur.execute("SELECT a, b FROM test_null_binding ORDER BY a")
rows = cur.fetchall()
assert len(rows) == 2
assert rows[0] == ("hello", None)
assert rows[1] == ("world", None)
def test_bind_multiple_null_parameters(postgres: dbapi.Connection) -> None:
"""Test binding multiple None parameters in a single statement (issue #3549)."""
with postgres.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS test_null_binding")
cur.execute("CREATE TABLE test_null_binding(a INT, b TEXT, c FLOAT)")
postgres.commit()
with postgres.cursor() as cur:
# All parameters are None
cur.execute(
"INSERT INTO test_null_binding VALUES ($1, $2, $3)", (None, None, None)
)
postgres.commit()
with postgres.cursor() as cur:
cur.execute("SELECT * FROM test_null_binding")
result = cur.fetchone()
assert result is not None
assert result[0] is None
assert result[1] is None
assert result[2] is None
def test_bind_null_unknown_inference(postgres: dbapi.Connection) -> None:
"""Test binding None where PostgreSQL can't infer a concrete parameter type."""
with postgres.cursor() as cur:
cur.execute("SELECT $1", (None,))
result = cur.fetchone()
assert result is not None
assert result[0] is None