Repository URL to install this package:
|
Version:
1.9.0 ▾
|
adbc-driver-manager
/
_lib.pyx
|
|---|
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
# cython: freethreading_compatible=True
"""Low-level ADBC API."""
import enum
import functools
import threading
import os
import pathlib
import typing
import sys
import warnings
from typing import List, Optional, Tuple
import cython
from cpython.bytes cimport PyBytes_FromStringAndSize
from cpython.pycapsule cimport (
PyCapsule_GetPointer, PyCapsule_IsValid, PyCapsule_New
)
from libc.stdint cimport int64_t, uint8_t, uint32_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memset
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
if typing.TYPE_CHECKING:
from typing import Self # no-cython-lint
class AdbcStatusCode(enum.IntEnum):
"""
A status code indicating the type of error.
"""
OK = ADBC_STATUS_OK
UNKNOWN = ADBC_STATUS_UNKNOWN
NOT_IMPLEMENTED = ADBC_STATUS_NOT_IMPLEMENTED
NOT_FOUND = ADBC_STATUS_NOT_FOUND
ALREADY_EXISTS = ADBC_STATUS_ALREADY_EXISTS
INVALID_ARGUMENT = ADBC_STATUS_INVALID_ARGUMENT
INVALID_STATE = ADBC_STATUS_INVALID_STATE
INVALID_DATA = ADBC_STATUS_INVALID_DATA
INTEGRITY = ADBC_STATUS_INTEGRITY
INTERNAL = ADBC_STATUS_INTERNAL
IO = ADBC_STATUS_IO
CANCELLED = ADBC_STATUS_CANCELLED
TIMEOUT = ADBC_STATUS_TIMEOUT
UNAUTHENTICATED = ADBC_STATUS_UNAUTHENTICATED
UNAUTHORIZED = ADBC_STATUS_UNAUTHORIZED
class AdbcInfoCode(enum.IntEnum):
VENDOR_NAME = ADBC_INFO_VENDOR_NAME
VENDOR_VERSION = ADBC_INFO_VENDOR_VERSION
VENDOR_ARROW_VERSION = ADBC_INFO_VENDOR_ARROW_VERSION
DRIVER_NAME = ADBC_INFO_DRIVER_NAME
DRIVER_VERSION = ADBC_INFO_DRIVER_VERSION
DRIVER_ARROW_VERSION = ADBC_INFO_DRIVER_ARROW_VERSION
class Warning(UserWarning):
"""
PEP 249-compliant base warning class.
"""
class Error(Exception):
"""
PEP 249-compliant base exception class.
Attributes
----------
status_code : AdbcStatusCode
The original ADBC status code.
vendor_code : int, optional
A vendor-specific status code if present.
sqlstate : str, optional
The SQLSTATE code if present.
details : list[tuple[str, bytes]], optional
Additional error details, if present.
"""
def __init__(
self,
message,
*,
status_code,
vendor_code=None,
sqlstate=None,
details=None,
):
super().__init__(message)
self.status_code = AdbcStatusCode(status_code)
self.vendor_code = vendor_code
self.sqlstate = sqlstate
self.details = details or []
class InterfaceError(Error):
"""Errors related to the database interface."""
class DatabaseError(Error):
"""Errors related to the database."""
class DataError(DatabaseError):
"""Errors related to processed data."""
class OperationalError(DatabaseError):
"""Errors related to database operation, not under user control."""
class IntegrityError(DatabaseError):
"""Errors related to relational integrity."""
class InternalError(DatabaseError):
"""Errors related to database-internal errors."""
class ProgrammingError(DatabaseError):
"""Errors related to user errors."""
class NotSupportedError(DatabaseError):
"""An operation or some functionality is not supported."""
def __init__(self, message, *, vendor_code=None, sqlstate=None, details=None):
super().__init__(
message,
status_code=AdbcStatusCode.NOT_IMPLEMENTED,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
# XXX: shorten the traceback a bit (and avoid exposing _lib). We
# could also define the exceptions in __init__ but then we'd have a
# circular import situation
Error.__module__ = "adbc_driver_manager"
InterfaceError.__module__ = "adbc_driver_manager"
DatabaseError.__module__ = "adbc_driver_manager"
DataError.__module__ = "adbc_driver_manager"
OperationalError.__module__ = "adbc_driver_manager"
IntegrityError.__module__ = "adbc_driver_manager"
InternalError.__module__ = "adbc_driver_manager"
ProgrammingError.__module__ = "adbc_driver_manager"
NotSupportedError.__module__ = "adbc_driver_manager"
INGEST_OPTION_MODE = ADBC_INGEST_OPTION_MODE.decode("utf-8")
INGEST_OPTION_MODE_APPEND = ADBC_INGEST_OPTION_MODE_APPEND.decode("utf-8")
INGEST_OPTION_MODE_CREATE = ADBC_INGEST_OPTION_MODE_CREATE.decode("utf-8")
INGEST_OPTION_MODE_REPLACE = ADBC_INGEST_OPTION_MODE_REPLACE.decode("utf-8")
INGEST_OPTION_MODE_CREATE_APPEND = ADBC_INGEST_OPTION_MODE_CREATE_APPEND.decode("utf-8")
INGEST_OPTION_TARGET_TABLE = ADBC_INGEST_OPTION_TARGET_TABLE.decode("utf-8")
cdef object convert_error(CAdbcStatusCode status, CAdbcError* error):
cdef CAdbcErrorDetail c_detail
if status == ADBC_STATUS_OK:
return None
message = CAdbcStatusCodeMessage(status).decode("utf-8")
vendor_code = None
sqlstate = None
details = []
if error != NULL:
if error.message != NULL:
message += ": "
message += error.message.decode("utf-8", "replace")
if (
error.vendor_code and
error.vendor_code != ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
):
vendor_code = error.vendor_code
message += f". Vendor code: {vendor_code}"
if error.sqlstate[0] != 0:
sqlstate = bytes(error.sqlstate[i] for i in range(5))
sqlstate = sqlstate.decode("ascii", "replace")
message += f". SQLSTATE: {sqlstate}"
num_details = AdbcErrorGetDetailCount(error)
for index in range(num_details):
c_detail = AdbcErrorGetDetail(error, index)
if c_detail.key == NULL or c_detail.value == NULL:
# Shouldn't happen...
break
details.append(
(c_detail.key,
PyBytes_FromStringAndSize(
<const char*> c_detail.value,
c_detail.value_length)))
if error.release:
error.release(error)
klass = Error
if status in (ADBC_STATUS_INVALID_DATA,):
klass = DataError
elif status in (
ADBC_STATUS_IO,
ADBC_STATUS_CANCELLED,
ADBC_STATUS_TIMEOUT,
ADBC_STATUS_UNKNOWN,
):
klass = OperationalError
elif status in (ADBC_STATUS_INTEGRITY,):
klass = IntegrityError
elif status in (ADBC_STATUS_INTERNAL,):
klass = InternalError
elif status in (ADBC_STATUS_ALREADY_EXISTS,
ADBC_STATUS_INVALID_ARGUMENT,
ADBC_STATUS_INVALID_STATE,
ADBC_STATUS_NOT_FOUND,
ADBC_STATUS_UNAUTHENTICATED,
ADBC_STATUS_UNAUTHORIZED):
klass = ProgrammingError
elif status == ADBC_STATUS_NOT_IMPLEMENTED:
return NotSupportedError(
message,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
return klass(
message,
status_code=status,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
cdef void check_error(CAdbcStatusCode status, CAdbcError* error) except *:
if status == ADBC_STATUS_OK:
return
raise convert_error(status, error)
cdef CAdbcError empty_error():
cdef CAdbcError error
memset(&error, 0, cython.sizeof(error))
# We always want the extended error info
error.vendor_code = ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
return error
cdef bytes _to_bytes(obj, str name):
if isinstance(obj, bytes):
return obj
elif isinstance(obj, str):
return obj.encode("utf-8")
raise ValueError(f"{name} must be str or bytes")
def _test_error(status_code, message, vendor_code, sqlstate) -> Error:
cdef CAdbcError error
error.release = NULL
message = _to_bytes(message, "message")
error.message = message
if vendor_code:
error.vendor_code = vendor_code
else:
error.vendor_code = 0
if sqlstate:
sqlstate = sqlstate.encode("ascii")
else:
sqlstate = b"\0\0\0\0\0"
for i in range(5):
error.sqlstate[i] = sqlstate[i]
return check_error(AdbcStatusCode(status_code), &error)
cdef class _AdbcHandle:
"""
Base class for ADBC handles, which are context managers.
"""
cdef:
size_t _open_children
object _lock
str _child_type
def __init__(self, str child_type):
self._lock = threading.Lock()
self._child_type = child_type
def __enter__(self) -> "Self":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
cdef _open_child(self):
with self._lock:
self._open_children += 1
cdef _close_child(self):
with self._lock:
if self._open_children == 0:
raise RuntimeError(
f"Underflow in closing this {self._child_type}")
self._open_children -= 1
cdef _check_open_children(self):
with self._lock:
if self._open_children != 0:
raise RuntimeError(
f"Cannot close {self.__class__.__name__} "
f"with open {self._child_type}")
def is_pycapsule(obj, bytes name) -> bool:
"""Check if an object is a PyCapsule of a specific type."""
# Taken from nanoarrow
return PyCapsule_IsValid(obj, name) == 1
cdef void pycapsule_schema_deleter(object capsule) noexcept:
cdef CArrowSchema* allocated = <CArrowSchema*>PyCapsule_GetPointer(
capsule, "arrow_schema"
)
if allocated.release != NULL:
allocated.release(allocated)
free(allocated)
cdef void pycapsule_stream_deleter(object capsule) noexcept:
cdef CArrowArrayStream* allocated = <CArrowArrayStream*> PyCapsule_GetPointer(
capsule, "arrow_array_stream"
)
if allocated.release != NULL:
allocated.release(allocated)
free(allocated)
cdef class ArrowSchemaHandle:
"""
A wrapper for an allocated ArrowSchema.
This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowSchema schema
def __del__(self):
self.release()
@property
def address(self) -> int:
"""The address of the ArrowSchema."""
return <uintptr_t> &self.schema
@property
def is_valid(self) -> bool:
"""Check validility (object has non-NULL release pointer)."""
return self.schema.release != NULL
def release(self):
"""Release this schema without having to import it.
No-op if already released (if ``not self.is_valid``).
Postcondition: ``not self.is_valid``.
"""
if self.schema.release != NULL:
self.schema.release(&self.schema)
self.schema.release = NULL
def __arrow_c_schema__(self) -> object:
"""Consume this object to get a PyCapsule."""
# Reference:
# https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html#create-a-pycapsule
cdef CArrowSchema* allocated = <CArrowSchema*> malloc(sizeof(CArrowSchema))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_schema", &pycapsule_schema_deleter,
)
memcpy(allocated, &self.schema, sizeof(CArrowSchema))
self.schema.release = NULL
return capsule
cdef class ArrowArrayHandle:
"""
A wrapper for an allocated ArrowArray.
This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowArray array
def __del__(self):
self.release()
@property
def is_valid(self) -> bool:
"""Check validility (object has non-NULL release pointer)."""
return self.array.release != NULL
def release(self):
"""Release this array without having to import it.
No-op if already released (if ``not self.is_valid``).
Postcondition: ``not self.is_valid``.
"""
if self.array.release != NULL:
self.array.release(&self.array)
self.array.release = NULL
@property
def address(self) -> int:
"""
The address of the ArrowArray.
"""
return <uintptr_t> &self.array
cdef class ArrowArrayStreamHandle:
"""
A wrapper for an allocated ArrowArrayStream.
This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowArrayStream stream
def __del__(self):
self.release()
@property
def address(self) -> int:
"""The address of the ArrowArrayStream."""
return <uintptr_t> &self.stream
@property
def is_valid(self) -> bool:
"""Check validility (object has non-NULL release pointer)."""
return self.stream.release != NULL
def release(self):
"""Release this stream without having to import it.
No-op if already released (if ``not self.is_valid``).
Postcondition: ``not self.is_valid``.
"""
if self.stream.release != NULL:
self.stream.release(&self.stream)
self.stream.release = NULL
def __arrow_c_schema__(self) -> object:
"""Get a PyCapsule without consuming this object."""
cdef const char* err = NULL
if not self.is_valid:
raise ValueError("ArrowArrayStreamHandle already consumed")
cdef CArrowSchema* allocated = <CArrowSchema*> malloc(sizeof(CArrowSchema))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_schema", &pycapsule_schema_deleter,
)
rc = self.stream.get_schema(&self.stream, allocated)
if rc != 0:
err = self.stream.get_last_error(&self.stream)
if err == NULL:
raise RuntimeError(f"Failed to get schema: ({rc})")
else:
s = err.decode()
raise RuntimeError(f"Failed to get schema: ({rc}) {s}")
return capsule
def __arrow_c_stream__(self, requested_schema=None) -> object:
"""Consume this object to get a PyCapsule."""
if requested_schema is not None:
raise NotImplementedError("requested_schema")
cdef CArrowArrayStream* allocated = \
<CArrowArrayStream*> malloc(sizeof(CArrowArrayStream))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_array_stream", &pycapsule_stream_deleter,
)
memcpy(allocated, &self.stream, sizeof(CArrowArrayStream))
self.stream.release = NULL
return capsule
class GetObjectsDepth(enum.IntEnum):
"""
How much data to fetch for adbc_get_objects.
"""
ALL = ADBC_OBJECT_DEPTH_ALL
CATALOGS = ADBC_OBJECT_DEPTH_CATALOGS
DB_SCHEMAS = ADBC_OBJECT_DEPTH_DB_SCHEMAS
TABLES = ADBC_OBJECT_DEPTH_TABLES
COLUMNS = ADBC_OBJECT_DEPTH_COLUMNS
# Assume a driver won't return more than 128 MiB of option data at
# once.
_MAX_OPTION_SIZE = 2**27
cdef class AdbcDatabase(_AdbcHandle):
"""
An instance of a database.
Parameters
----------
kwargs : dict
String key-value options to pass to the underlying database.
Must include at least "driver" to identify the underlying
database driver to load.
"""
cdef:
CAdbcDatabase database
def __init__(self, **kwargs) -> None:
super().__init__("AdbcConnection")
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef const char* c_key
cdef const char* c_value
memset(&self.database, 0, cython.sizeof(CAdbcDatabase))
with nogil:
status = AdbcDatabaseNew(&self.database, &c_error)
check_error(status, &c_error)
# N.B. these calls are handled by the driver manager and do not do
# synchronous I/O, so there is no need to constantly release/acquire
# the GIL
# by default, allow searching for manifests and relative paths
status = AdbcDriverManagerDatabaseSetLoadFlags(
&self.database, CAdbcLoadFlagDefault, &c_error)
check_error(status, &c_error)
for key, value in kwargs.items():
if key == "init_func":
status = AdbcDriverManagerDatabaseSetInitFunc(
&self.database, <CAdbcDriverInitFunc> (<uintptr_t> value), &c_error)
elif key == "load_flags":
status = AdbcDriverManagerDatabaseSetLoadFlags(
&self.database, <CAdbcLoadFlags> (value), &c_error)
elif key is None:
raise ValueError("key cannot be None")
elif value is None:
raise ValueError(f"value for key '{key}' cannot be None")
else:
key = _to_bytes(key, "key")
if isinstance(value, pathlib.Path):
value = str(value)
value = _to_bytes(value, "value")
c_key = key
c_value = value
status = AdbcDatabaseSetOption(
&self.database, c_key, c_value, &c_error)
check_error(status, &c_error)
# check if we're running in a venv
if sys.prefix != sys.base_prefix:
# if we're in a venv, add the venv prefix to the search path list
status = AdbcDriverManagerDatabaseSetAdditionalSearchPathList(
&self.database, _to_bytes(os.path.join(sys.prefix, "etc/adbc/drivers"),
"sys.prefix"),
&c_error)
check_error(status, &c_error)
with nogil:
status = AdbcDatabaseInit(&self.database, &c_error)
check_error(status, &c_error)
def close(self) -> None:
"""Release the handle to the database."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
self._check_open_children()
with self._lock:
if self.database.private_data == NULL:
return
with nogil:
status = AdbcDatabaseRelease(&self.database, &c_error)
check_error(status, &c_error)
def get_option(
self,
key: str | bytes,
*,
encoding="utf-8",
errors="strict",
) -> str:
"""
Get the value of a string option.
Parameters
----------
key : str or bytes
The option to get.
encoding : str
The encoding of the option value. This should almost
always be UTF-8.
errors : str
What to do about errors when decoding the option value
(see bytes.decode).
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef char* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcDatabaseGetOption(
&self.database, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
# Remove trailing null terminator
if c_len > 0:
c_len -= 1
return buf[:c_len].decode(encoding, errors)
def get_option_bytes(self, key: str) -> bytes:
"""Get the value of a binary option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef uint8_t* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcDatabaseGetOptionBytes(
&self.database, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
return bytes(buf[:c_len])
def get_option_float(self, key: str) -> float:
"""Get the value of a floating-point option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef double c_value = 0.0
with nogil:
status = AdbcDatabaseGetOptionDouble(
&self.database, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def get_option_int(self, key: str) -> int:
"""Get the value of an integer option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef int64_t c_value = 0
with nogil:
status = AdbcDatabaseGetOptionInt(
&self.database, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def set_options(self, **kwargs) -> None:
"""
Set arbitrary key-value options.
Pass options as kwargs: ``set_options(**{"some.option": "value"})``.
Note, not all drivers support setting options after creation.
See Also
--------
adbc_driver_manager.DatabaseOptions : Standard option names.
"""
cdef CAdbcError c_error = empty_error()
cdef char* c_key = NULL
cdef char* c_value = NULL
cdef size_t c_value_len = 0
cdef double c_double
cdef int64_t c_int
for key, value in kwargs.items():
key = _to_bytes(key, "option key")
c_key = key
if value is None:
c_value = NULL
status = AdbcDatabaseSetOption(
&self.database, c_key, c_value, &c_error)
elif isinstance(value, str):
value = _to_bytes(value, "option value")
c_value = value
status = AdbcDatabaseSetOption(
&self.database, c_key, c_value, &c_error)
elif isinstance(value, bool):
if value:
value = ADBC_OPTION_VALUE_ENABLED
else:
value = ADBC_OPTION_VALUE_DISABLED
value = _to_bytes(value, "option value")
c_value = value
status = AdbcDatabaseSetOption(
&self.database, c_key, c_value, &c_error)
elif isinstance(value, bytes):
c_value = value
c_value_len = len(value)
with nogil:
status = AdbcDatabaseSetOptionBytes(
&self.database, c_key, <const uint8_t*> c_value,
c_value_len, &c_error)
elif isinstance(value, float):
c_double = value
with nogil:
status = AdbcDatabaseSetOptionDouble(
&self.database, c_key, c_double, &c_error)
elif isinstance(value, int):
c_int = value
with nogil:
status = AdbcDatabaseSetOptionInt(
&self.database, c_key, c_int, &c_error)
else:
raise ValueError(
f"Unsupported type {type(value)} for value {value!r} "
f"of option {key}")
check_error(status, &c_error)
cdef class AdbcConnection(_AdbcHandle):
"""
An active database connection.
Connections are not thread-safe and clients should take care to
serialize accesses to a connection.
Parameters
----------
database : AdbcDatabase
The database to connect to.
kwargs : dict
String key-value options to pass to the underlying database.
"""
cdef:
AdbcDatabase database
CAdbcConnection connection
def __init__(self, AdbcDatabase database, **kwargs) -> None:
super().__init__("AdbcStatement")
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef const char* c_key
cdef const char* c_value
self.database = database
memset(&self.connection, 0, cython.sizeof(CAdbcConnection))
with nogil:
status = AdbcConnectionNew(&self.connection, &c_error)
check_error(status, &c_error)
for key, value in kwargs.items():
key = key.encode("utf-8")
value = value.encode("utf-8")
c_key = key
c_value = value
with nogil:
status = AdbcConnectionSetOption(
&self.connection, c_key, c_value, &c_error)
if status != ADBC_STATUS_OK:
AdbcConnectionRelease(&self.connection, NULL)
check_error(status, &c_error)
with nogil:
status = AdbcConnectionInit(&self.connection, &database.database, &c_error)
if status != ADBC_STATUS_OK:
AdbcConnectionRelease(&self.connection, NULL)
check_error(status, &c_error)
database._open_child()
def cancel(self) -> None:
"""Attempt to cancel any ongoing operations on the connection."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
with nogil:
status = AdbcConnectionCancel(&self.connection, &c_error)
check_error(status, &c_error)
def commit(self) -> None:
"""Commit the current transaction."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
with nogil:
status = AdbcConnectionCommit(&self.connection, &c_error)
check_error(status, &c_error)
def get_info(self, info_codes=None) -> ArrowArrayStreamHandle:
"""
Get metadata about the database/driver.
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
cdef c_vector[uint32_t] c_info_codes
if info_codes:
for info_code in info_codes:
if isinstance(info_code, int):
c_info_codes.push_back(info_code)
else:
c_info_codes.push_back(info_code.value)
with nogil:
status = AdbcConnectionGetInfo(
&self.connection,
c_info_codes.data(),
c_info_codes.size(),
&stream.stream,
&c_error)
else:
with nogil:
status = AdbcConnectionGetInfo(
&self.connection,
NULL,
0,
&stream.stream,
&c_error)
check_error(status, &c_error)
return stream
def get_objects(self, depth, catalog=None, db_schema=None, table_name=None,
table_types=None, column_name=None) -> ArrowArrayStreamHandle:
"""
Get a hierarchical view of database objects.
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
cdef int c_depth = GetObjectsDepth(depth).value
cdef char* c_catalog = NULL
if catalog is not None:
catalog = _to_bytes(catalog, "catalog")
c_catalog = catalog
cdef char* c_db_schema = NULL
if db_schema is not None:
db_schema = _to_bytes(db_schema, "db_schema")
c_db_schema = db_schema
cdef char* c_table_name = NULL
if table_name is not None:
table_name = _to_bytes(table_name, "table_name")
c_table_name = table_name
cdef char* c_column_name = NULL
if column_name is not None:
column_name = _to_bytes(column_name, "column_name")
c_column_name = column_name
with nogil:
status = AdbcConnectionGetObjects(
&self.connection,
c_depth,
c_catalog,
c_db_schema,
c_table_name,
NULL, # TODO: support table_types
c_column_name,
&stream.stream,
&c_error)
check_error(status, &c_error)
return stream
def get_option(
self,
key: str | bytes,
*,
encoding="utf-8",
errors="strict",
) -> str:
"""
Get the value of a string option.
Parameters
----------
key : str or bytes
The option to get.
encoding : str
The encoding of the option value. This should almost
always be UTF-8.
errors : str
What to do about errors when decoding the option value
(see bytes.decode).
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef char* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcConnectionGetOption(
&self.connection, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
# Remove trailing null terminator
if c_len > 0:
c_len -= 1
return buf[:c_len].decode(encoding, errors)
def get_option_bytes(self, key: str) -> bytes:
"""Get the value of a binary option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef uint8_t* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcConnectionGetOptionBytes(
&self.connection, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
return bytes(buf[:c_len])
def get_option_float(self, key: str) -> float:
"""Get the value of a floating-point option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef double c_value = 0.0
with nogil:
status = AdbcConnectionGetOptionDouble(
&self.connection, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def get_option_int(self, key: str) -> int:
"""Get the value of an integer option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef int64_t c_value = 0
with nogil:
status = AdbcConnectionGetOptionInt(
&self.connection, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def get_table_schema(self, catalog, db_schema, table_name) -> ArrowSchemaHandle:
"""
Get the Arrow schema of a table.
Returns
-------
ArrowSchemaHandle
A C Data Interface ArrowSchema struct containing the schema.
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowSchemaHandle handle = ArrowSchemaHandle()
table_name = _to_bytes(table_name, "table_name")
cdef char* c_table_name = table_name
cdef char* c_catalog = NULL
if catalog is not None:
catalog = _to_bytes(catalog, "catalog")
c_catalog = catalog
cdef char* c_db_schema = NULL
if db_schema is not None:
db_schema = _to_bytes(db_schema, "db_schema")
c_db_schema = db_schema
with nogil:
status = AdbcConnectionGetTableSchema(
&self.connection,
c_catalog,
c_db_schema,
c_table_name,
&handle.schema,
&c_error)
check_error(status, &c_error)
return handle
def get_table_types(self) -> ArrowArrayStreamHandle:
"""
Get the list of supported table types.
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
with nogil:
status = AdbcConnectionGetTableTypes(
&self.connection, &stream.stream, &c_error)
check_error(status, &c_error)
return stream
def read_partition(self, bytes partition not None) -> ArrowArrayStreamHandle:
"""Fetch a single partition from execute_partitions."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
cdef const uint8_t* data = <const uint8_t*> partition
cdef size_t length = len(partition)
with nogil:
status = AdbcConnectionReadPartition(
&self.connection,
data,
length,
&stream.stream,
&c_error)
check_error(status, &c_error)
return stream
def rollback(self) -> None:
"""Rollback the current transaction."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
with nogil:
status = AdbcConnectionRollback(&self.connection, &c_error)
check_error(status, &c_error)
def set_autocommit(self, bint enabled) -> None:
"""Toggle whether autocommit is enabled."""
cdef CAdbcError c_error = empty_error()
if enabled:
value = ADBC_OPTION_VALUE_ENABLED
else:
value = ADBC_OPTION_VALUE_DISABLED
with nogil:
status = AdbcConnectionSetOption(
&self.connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
value,
&c_error)
check_error(status, &c_error)
def set_options(self, **kwargs) -> None:
"""
Set arbitrary key-value options.
Pass options as kwargs: ``set_options(**{"some.option": "value"})``.
Note, not all drivers support setting options after creation.
See Also
--------
adbc_driver_manager.ConnectionOptions : Standard option names.
"""
cdef CAdbcError c_error = empty_error()
cdef char* c_key = NULL
cdef char* c_value = NULL
cdef size_t c_value_len
cdef double c_double
cdef int64_t c_int
for key, value in kwargs.items():
key = _to_bytes(key, "option key")
c_key = key
if value is None:
c_value = NULL
with nogil:
status = AdbcConnectionSetOption(
&self.connection, c_key, c_value, &c_error)
elif isinstance(value, str):
value = _to_bytes(value, "option value")
c_value = value
with nogil:
status = AdbcConnectionSetOption(
&self.connection, c_key, c_value, &c_error)
elif isinstance(value, bool):
if value:
value = ADBC_OPTION_VALUE_ENABLED
else:
value = ADBC_OPTION_VALUE_DISABLED
value = _to_bytes(value, "option value")
c_value = value
with nogil:
status = AdbcConnectionSetOption(
&self.connection, c_key, c_value, &c_error)
elif isinstance(value, bytes):
c_value = value
c_value_len = len(value)
with nogil:
status = AdbcConnectionSetOptionBytes(
&self.connection, c_key, <const uint8_t*> c_value,
c_value_len, &c_error)
elif isinstance(value, float):
c_double = value
with nogil:
status = AdbcConnectionSetOptionDouble(
&self.connection, c_key, c_double, &c_error)
elif isinstance(value, int):
c_int = value
with nogil:
status = AdbcConnectionSetOptionInt(
&self.connection, c_key, c_int, &c_error)
else:
raise ValueError(
f"Unsupported type {type(value)} for value {value!r} "
f"of option {key}")
check_error(status, &c_error)
def close(self) -> None:
"""Release the handle to the connection."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
self._check_open_children()
with self._lock:
if self.connection.private_data == NULL:
return
with nogil:
status = AdbcConnectionRelease(&self.connection, &c_error)
check_error(status, &c_error)
self.database._close_child()
cdef class AdbcStatement(_AdbcHandle):
"""
A database statement.
Statements are not thread-safe and clients should take care to
serialize accesses to a connection.
Parameters
----------
connection : AdbcConnection
The connection to create the statement for.
"""
cdef:
AdbcConnection connection
CAdbcStatement statement
def __init__(self, AdbcConnection connection) -> None:
super().__init__("(no child type)")
cdef CAdbcError c_error = empty_error()
self.connection = connection
memset(&self.statement, 0, cython.sizeof(CAdbcStatement))
with nogil:
status = AdbcStatementNew(
&connection.connection,
&self.statement,
&c_error)
check_error(status, &c_error)
connection._open_child()
def bind(self, data, schema=None) -> None:
"""
Bind an ArrowArray to this statement.
Parameters
----------
data : PyCapsule or int or ArrowArrayHandle
schema : PyCapsule or int or ArrowSchemaHandle
"""
cdef CAdbcError c_error = empty_error()
cdef CArrowArray* c_array
cdef CArrowSchema* c_schema
if (hasattr(data, "__arrow_c_array__") and
not isinstance(data, ArrowArrayHandle)):
if schema is not None:
raise ValueError(
"Can not provide a schema when passing Arrow-compatible "
"data that implements the Arrow PyCapsule Protocol"
)
schema, data = data.__arrow_c_array__()
if is_pycapsule(data, b"arrow_array"):
c_array = <CArrowArray*> PyCapsule_GetPointer(data, "arrow_array")
elif isinstance(data, ArrowArrayHandle):
c_array = &(<ArrowArrayHandle> data).array
elif isinstance(data, int):
c_array = <CArrowArray*> data
else:
raise TypeError(
"data must be Arrow-compatible data (implementing the Arrow PyCapsule "
f"Protocol), a PyCapsule, int or ArrowArrayHandle, not {type(data)}"
)
if is_pycapsule(schema, b"arrow_schema"):
c_schema = <CArrowSchema*> PyCapsule_GetPointer(schema, "arrow_schema")
elif isinstance(schema, ArrowSchemaHandle):
c_schema = &(<ArrowSchemaHandle> schema).schema
elif isinstance(schema, int):
c_schema = <CArrowSchema*> schema
else:
raise TypeError("schema must be a PyCapsule, int or ArrowSchemaHandle, "
f"not {type(schema)}")
with nogil:
status = AdbcStatementBind(
&self.statement,
c_array,
c_schema,
&c_error)
check_error(status, &c_error)
def bind_stream(self, stream) -> None:
"""
Bind an ArrowArrayStream to this statement.
Parameters
----------
stream : PyCapsule or int or ArrowArrayStreamHandle
"""
cdef CAdbcError c_error = empty_error()
cdef CArrowArrayStream* c_stream
if (
hasattr(stream, "__arrow_c_stream__")
and not isinstance(stream, ArrowArrayStreamHandle)
):
stream = stream.__arrow_c_stream__()
if is_pycapsule(stream, b"arrow_array_stream"):
c_stream = <CArrowArrayStream*> PyCapsule_GetPointer(
stream, "arrow_array_stream"
)
elif isinstance(stream, ArrowArrayStreamHandle):
c_stream = &(<ArrowArrayStreamHandle> stream).stream
elif isinstance(stream, int):
c_stream = <CArrowArrayStream*> stream
else:
raise TypeError(f"data must be a PyCapsule, int or ArrowArrayStreamHandle, "
f"not {type(stream)}")
with nogil:
status = AdbcStatementBindStream(
&self.statement,
c_stream,
&c_error)
check_error(status, &c_error)
def cancel(self) -> None:
"""Attempt to cancel any ongoing operations on the connection."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
with nogil:
status = AdbcStatementCancel(&self.statement, &c_error)
check_error(status, &c_error)
def close(self) -> None:
"""Release the handle to the statement."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
self.connection._close_child()
with self._lock:
if self.statement.private_data == NULL:
return
with nogil:
status = AdbcStatementRelease(&self.statement, &c_error)
check_error(status, &c_error)
def execute_query(self) -> Tuple[ArrowArrayStreamHandle, int]:
"""
Execute the query and get the result set.
Returns
-------
ArrowArrayStreamHandle
The result set.
int
The number of rows if known, else -1.
"""
cdef CAdbcError c_error = empty_error()
cdef ArrowArrayStreamHandle stream = ArrowArrayStreamHandle()
cdef int64_t rows_affected = 0
with nogil:
status = AdbcStatementExecuteQuery(
&self.statement,
&stream.stream,
&rows_affected,
&c_error)
check_error(status, &c_error)
return (stream, rows_affected)
def execute_partitions(self) -> \
Tuple[List[bytes], Optional[ArrowSchemaHandle], int]:
"""
Execute the query and get the partitions of the result set.
Not all drivers will support this.
Returns
-------
list of byte
The partitions of the distributed result set.
ArrowSchemaHandle or None
The schema of the result set. May be None if incremental
execution is enabled and the server does not return a schema.
int
The number of rows if known, else -1.
"""
cdef CAdbcError c_error = empty_error()
cdef ArrowSchemaHandle schema = ArrowSchemaHandle()
cdef CAdbcPartitions c_partitions = CAdbcPartitions(
0, NULL, NULL, NULL, NULL)
cdef int64_t rows_affected = 0
with nogil:
status = AdbcStatementExecutePartitions(
&self.statement,
&schema.schema,
&c_partitions,
&rows_affected,
&c_error)
check_error(status, &c_error)
partitions = []
for i in range(c_partitions.num_partitions):
length = c_partitions.partition_lengths[i]
data = <const char*> c_partitions.partitions[i]
partitions.append(PyBytes_FromStringAndSize(data, length))
c_partitions.release(&c_partitions)
if schema.schema.release == NULL:
return partitions, None, rows_affected
return partitions, schema, rows_affected
def execute_schema(self) -> ArrowSchemaHandle:
"""
Get the schema of the result set without executing the query.
Returns
-------
ArrowSchemaHandle
The schema of the result set.
"""
cdef CAdbcError c_error = empty_error()
cdef ArrowSchemaHandle schema = ArrowSchemaHandle()
with nogil:
status = AdbcStatementExecuteSchema(
&self.statement,
&schema.schema,
&c_error)
check_error(status, &c_error)
return schema
def execute_update(self) -> int:
"""
Execute the query without a result set.
Returns
-------
int
The number of affected rows if known, else -1.
"""
cdef CAdbcError c_error = empty_error()
cdef int64_t rows_affected = 0
with nogil:
status = AdbcStatementExecuteQuery(
&self.statement,
NULL,
&rows_affected,
&c_error)
check_error(status, &c_error)
return rows_affected
def get_option(
self,
key: str | bytes,
*,
encoding="utf-8",
errors="strict",
) -> str:
"""
Get the value of a string option.
Parameters
----------
key : str or bytes
The option to get.
encoding : str
The encoding of the option value. This should almost
always be UTF-8.
errors : str
What to do about errors when decoding the option value
(see bytes.decode).
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef char* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcStatementGetOption(
&self.statement, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
# Remove trailing null terminator
if c_len > 0:
c_len -= 1
return buf[:c_len].decode(encoding, errors)
def get_option_bytes(self, key: str) -> bytes:
"""Get the value of a binary option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef size_t c_len = 0
cdef uint8_t* c_buf = NULL
buf = bytearray(1024)
while True:
c_len = len(buf)
c_buf = buf
with nogil:
status = AdbcStatementGetOptionBytes(
&self.statement, c_key, c_buf, &c_len, &c_error)
check_error(status, &c_error)
if c_len <= len(buf):
# Entire value read
break
else:
# Buffer too small
new_len = len(buf) * 2
if new_len > _MAX_OPTION_SIZE:
raise RuntimeError(
f"Could not read option {key}: "
f"would need more than {len(buf)} bytes")
buf = bytearray(new_len)
return bytes(buf[:c_len])
def get_option_float(self, key: str) -> float:
"""Get the value of a floating-point option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef double c_value = 0.0
with nogil:
status = AdbcStatementGetOptionDouble(
&self.statement, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def get_option_int(self, key: str) -> int:
"""Get the value of an integer option."""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
key_bytes = _to_bytes(key, "key")
cdef char* c_key = key_bytes
cdef int64_t c_value = 0
with nogil:
status = AdbcStatementGetOptionInt(
&self.statement, c_key, &c_value, &c_error)
check_error(status, &c_error)
return c_value
def get_parameter_schema(self) -> ArrowSchemaHandle:
"""Get the Arrow schema for bound parameters.
This retrieves an Arrow schema describing the number, names,
and types of the parameters in a parameterized statement. The
fields of the schema should be in order of the ordinal
position of the parameters; named parameters should appear
only once.
If the parameter does not have a name, or the name cannot be
determined, the name of the corresponding field in the schema
will be an empty string. If the type cannot be determined,
the type of the corresponding field will be NA (NullType).
This should be called after :meth:`prepare`.
Raises
------
NotSupportedError
If the schema could not be determined.
"""
cdef CAdbcError c_error = empty_error()
cdef CAdbcStatusCode status
cdef ArrowSchemaHandle handle = ArrowSchemaHandle()
with nogil:
status = AdbcStatementGetParameterSchema(
&self.statement, &handle.schema, &c_error)
check_error(status, &c_error)
return handle
def prepare(self) -> None:
"""Turn this statement into a prepared statement."""
cdef CAdbcError c_error = empty_error()
with nogil:
status = AdbcStatementPrepare(&self.statement, &c_error)
check_error(status, &c_error)
def set_options(self, **kwargs) -> None:
"""Set arbitrary key-value options for this statement only.
Pass options as kwargs: ``set_options(**{"some.option": "value"})``.
Note, not all drivers support setting options after creation.
See Also
--------
adbc_driver_manager.StatementOptions : Standard option names.
"""
cdef CAdbcError c_error = empty_error()
cdef char* c_key = NULL
cdef char* c_value = NULL
cdef size_t c_value_len
cdef double c_double
cdef int64_t c_int
for key, value in kwargs.items():
key = _to_bytes(key, "option key")
c_key = key
if value is None:
c_value = NULL
with nogil:
status = AdbcStatementSetOption(
&self.statement, c_key, c_value, &c_error)
elif isinstance(value, str):
value = _to_bytes(value, "option value")
c_value = value
with nogil:
status = AdbcStatementSetOption(
&self.statement, c_key, c_value, &c_error)
elif isinstance(value, bool):
if value:
value = ADBC_OPTION_VALUE_ENABLED
else:
value = ADBC_OPTION_VALUE_DISABLED
value = _to_bytes(value, "option value")
c_value = value
with nogil:
status = AdbcStatementSetOption(
&self.statement, c_key, c_value, &c_error)
elif isinstance(value, bytes):
c_value = value
c_value_len = len(value)
with nogil:
status = AdbcStatementSetOptionBytes(
&self.statement, c_key, <const uint8_t*> c_value,
c_value_len, &c_error)
elif isinstance(value, float):
c_double = value
with nogil:
status = AdbcStatementSetOptionDouble(
&self.statement, c_key, c_double, &c_error)
elif isinstance(value, int):
c_int = value
with nogil:
status = AdbcStatementSetOptionInt(
&self.statement, c_key, c_int, &c_error)
else:
raise ValueError(
f"Unsupported type {type(value)} for value {value!r} "
f"of option {key}")
check_error(status, &c_error)
def set_sql_query(self, str query not None) -> None:
"""Set a SQL query to be executed."""
cdef CAdbcError c_error = empty_error()
cdef bytes query_data = query.encode("utf-8")
cdef const char* c_query = query_data
with nogil:
status = AdbcStatementSetSqlQuery(
&self.statement, c_query, &c_error)
check_error(status, &c_error)
def set_substrait_plan(self, bytes plan not None) -> None:
"""Set a Substrait plan to be executed."""
cdef CAdbcError c_error = empty_error()
cdef const uint8_t* c_plan = <const uint8_t*> plan
cdef size_t length = len(plan)
with nogil:
status = AdbcStatementSetSubstraitPlan(
&self.statement, c_plan, length, &c_error)
check_error(status, &c_error)
cdef const CAdbcError* PyAdbcErrorFromArrayStream(
CArrowArrayStream* stream, CAdbcStatusCode* status):
return AdbcErrorFromArrayStream(stream, status)
cdef extern from "_blocking_impl.h" nogil:
ctypedef void (*BlockingCallback)(void*) noexcept nogil
c_string CInitBlockingCallback"pyadbc_driver_manager::InitBlockingCallback"()
c_string CSetBlockingCallback"pyadbc_driver_manager::SetBlockingCallback"(
BlockingCallback, void* data)
c_string CClearBlockingCallback"pyadbc_driver_manager::ClearBlockingCallback"()
@functools.lru_cache
def _init_blocking_call():
error = bytes(CInitBlockingCallback()).decode("utf-8")
if error:
warnings.warn(
f"Failed to initialize KeyboardInterrupt support: {error}",
RuntimeWarning,
)
_blocking_lock = threading.Lock()
_blocking_exc = None
def _blocking_call_impl(func, args, kwargs, cancel):
"""
Run functions that are expected to block with a native SIGINT handler.
Parameters
----------
"""
global _blocking_exc
if threading.current_thread() is not threading.main_thread():
return func(*args, **kwargs)
_init_blocking_call()
with _blocking_lock:
if _blocking_exc:
_blocking_exc = None
# Set the callback for the background thread and save the signal handler
# TODO: ideally this would be no-op if already set
error = bytes(
CSetBlockingCallback(&_handle_blocking_call, <void*>cancel)
).decode("utf-8")
if error:
warnings.warn(
f"Failed to set SIGINT handler: {error}",
RuntimeWarning,
)
try:
return func(*args, **kwargs)
except BaseException as e:
with _blocking_lock:
if _blocking_exc:
exc = _blocking_exc
_blocking_exc = None
raise e from exc[1].with_traceback(exc[2])
raise e
finally:
# Restore the signal handler
error = bytes(CClearBlockingCallback()).decode("utf-8")
if error:
warnings.warn(
f"Failed to restore SIGINT handler: {error}",
RuntimeWarning,
)
with _blocking_lock:
if _blocking_exc:
exc = _blocking_exc
_blocking_exc = None
raise exc[1].with_traceback(exc[2]) from KeyboardInterrupt
if os.name != "nt":
# https://github.com/apache/arrow-adbc/issues/1522
_blocking_call = _blocking_call_impl
else:
def _blocking_call(func, args, kwargs, cancel):
return func(*args, **kwargs)
cdef void _handle_blocking_call(void* c_cancel) noexcept nogil:
with gil:
try:
cancel = <object> c_cancel
cancel()
except: # no-cython-lint
with _blocking_lock:
global _blocking_exc
_blocking_exc = sys.exc_info()