Repository URL to install this package:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
"""Low-level ADBC API."""
import enum
import functools
import threading
import os
import typing
import sys
import warnings
from typing import List, Optional, Tuple
import cython
from cpython.bytes cimport PyBytes_FromStringAndSize
from cpython.pycapsule cimport (
PyCapsule_GetPointer, PyCapsule_New, PyCapsule_CheckExact
)
from libc.stdint cimport int64_t, uint8_t, uint32_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memset
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
if typing.TYPE_CHECKING:
from typing import Self # no-cython-lint
class AdbcStatusCode(enum.IntEnum):
"""
A status code indicating the type of error.
"""
OK = ADBC_STATUS_OK
UNKNOWN = ADBC_STATUS_UNKNOWN
NOT_IMPLEMENTED = ADBC_STATUS_NOT_IMPLEMENTED
NOT_FOUND = ADBC_STATUS_NOT_FOUND
ALREADY_EXISTS = ADBC_STATUS_ALREADY_EXISTS
INVALID_ARGUMENT = ADBC_STATUS_INVALID_ARGUMENT
INVALID_STATE = ADBC_STATUS_INVALID_STATE
INVALID_DATA = ADBC_STATUS_INVALID_DATA
INTEGRITY = ADBC_STATUS_INTEGRITY
INTERNAL = ADBC_STATUS_INTERNAL
IO = ADBC_STATUS_IO
CANCELLED = ADBC_STATUS_CANCELLED
TIMEOUT = ADBC_STATUS_TIMEOUT
UNAUTHENTICATED = ADBC_STATUS_UNAUTHENTICATED
UNAUTHORIZED = ADBC_STATUS_UNAUTHORIZED
class AdbcInfoCode(enum.IntEnum):
VENDOR_NAME = ADBC_INFO_VENDOR_NAME
VENDOR_VERSION = ADBC_INFO_VENDOR_VERSION
VENDOR_ARROW_VERSION = ADBC_INFO_VENDOR_ARROW_VERSION
DRIVER_NAME = ADBC_INFO_DRIVER_NAME
DRIVER_VERSION = ADBC_INFO_DRIVER_VERSION
DRIVER_ARROW_VERSION = ADBC_INFO_DRIVER_ARROW_VERSION
class Warning(UserWarning):
"""
PEP 249-compliant base warning class.
"""
class Error(Exception):
"""
PEP 249-compliant base exception class.
Attributes
----------
status_code : AdbcStatusCode
The original ADBC status code.
vendor_code : int, optional
A vendor-specific status code if present.
sqlstate : str, optional
The SQLSTATE code if present.
details : list[tuple[str, bytes]], optional
Additional error details, if present.
"""
def __init__(
self,
message,
*,
status_code,
vendor_code=None,
sqlstate=None,
details=None,
):
super().__init__(message)
self.status_code = AdbcStatusCode(status_code)
self.vendor_code = vendor_code
self.sqlstate = sqlstate
self.details = details or []
class InterfaceError(Error):
"""Errors related to the database interface."""
class DatabaseError(Error):
"""Errors related to the database."""
class DataError(DatabaseError):
"""Errors related to processed data."""
class OperationalError(DatabaseError):
"""Errors related to database operation, not under user control."""
class IntegrityError(DatabaseError):
"""Errors related to relational integrity."""
class InternalError(DatabaseError):
"""Errors related to database-internal errors."""
class ProgrammingError(DatabaseError):
"""Errors related to user errors."""
class NotSupportedError(DatabaseError):
"""An operation or some functionality is not supported."""
def __init__(self, message, *, vendor_code=None, sqlstate=None, details=None):
super().__init__(
message,
status_code=AdbcStatusCode.NOT_IMPLEMENTED,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
# XXX: shorten the traceback a bit (and avoid exposing _lib). We
# could also define the exceptions in __init__ but then we'd have a
# circular import situation
Error.__module__ = "adbc_driver_manager"
InterfaceError.__module__ = "adbc_driver_manager"
DatabaseError.__module__ = "adbc_driver_manager"
DataError.__module__ = "adbc_driver_manager"
OperationalError.__module__ = "adbc_driver_manager"
IntegrityError.__module__ = "adbc_driver_manager"
InternalError.__module__ = "adbc_driver_manager"
ProgrammingError.__module__ = "adbc_driver_manager"
NotSupportedError.__module__ = "adbc_driver_manager"
INGEST_OPTION_MODE = ADBC_INGEST_OPTION_MODE.decode("utf-8")
INGEST_OPTION_MODE_APPEND = ADBC_INGEST_OPTION_MODE_APPEND.decode("utf-8")
INGEST_OPTION_MODE_CREATE = ADBC_INGEST_OPTION_MODE_CREATE.decode("utf-8")
INGEST_OPTION_MODE_REPLACE = ADBC_INGEST_OPTION_MODE_REPLACE.decode("utf-8")
INGEST_OPTION_MODE_CREATE_APPEND = ADBC_INGEST_OPTION_MODE_CREATE_APPEND.decode("utf-8")
INGEST_OPTION_TARGET_TABLE = ADBC_INGEST_OPTION_TARGET_TABLE.decode("utf-8")
cdef object convert_error(CAdbcStatusCode status, CAdbcError* error):
cdef CAdbcErrorDetail c_detail
if status == ADBC_STATUS_OK:
return None
message = CAdbcStatusCodeMessage(status).decode("utf-8")
vendor_code = None
sqlstate = None
details = []
if error != NULL:
if error.message != NULL:
message += ": "
message += error.message.decode("utf-8", "replace")
if (
error.vendor_code and
error.vendor_code != ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
):
vendor_code = error.vendor_code
message += f". Vendor code: {vendor_code}"
if error.sqlstate[0] != 0:
sqlstate = bytes(error.sqlstate[i] for i in range(5))
sqlstate = sqlstate.decode("ascii", "replace")
message += f". SQLSTATE: {sqlstate}"
num_details = AdbcErrorGetDetailCount(error)
for index in range(num_details):
c_detail = AdbcErrorGetDetail(error, index)
if c_detail.key == NULL or c_detail.value == NULL:
# Shouldn't happen...
break
details.append(
(c_detail.key,
PyBytes_FromStringAndSize(
<const char*> c_detail.value,
c_detail.value_length)))
if error.release:
error.release(error)
klass = Error
if status in (ADBC_STATUS_INVALID_DATA,):
klass = DataError
elif status in (
ADBC_STATUS_IO,
ADBC_STATUS_CANCELLED,
ADBC_STATUS_TIMEOUT,
ADBC_STATUS_UNKNOWN,
):
klass = OperationalError
elif status in (ADBC_STATUS_INTEGRITY,):
klass = IntegrityError
elif status in (ADBC_STATUS_INTERNAL,):
klass = InternalError
elif status in (ADBC_STATUS_ALREADY_EXISTS,
ADBC_STATUS_INVALID_ARGUMENT,
ADBC_STATUS_INVALID_STATE,
ADBC_STATUS_NOT_FOUND,
ADBC_STATUS_UNAUTHENTICATED,
ADBC_STATUS_UNAUTHORIZED):
klass = ProgrammingError
elif status == ADBC_STATUS_NOT_IMPLEMENTED:
return NotSupportedError(
message,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
return klass(
message,
status_code=status,
vendor_code=vendor_code,
sqlstate=sqlstate,
details=details,
)
cdef void check_error(CAdbcStatusCode status, CAdbcError* error) except *:
if status == ADBC_STATUS_OK:
return
raise convert_error(status, error)
cdef CAdbcError empty_error():
cdef CAdbcError error
memset(&error, 0, cython.sizeof(error))
# We always want the extended error info
error.vendor_code = ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
return error
cdef bytes _to_bytes(obj, str name):
if isinstance(obj, bytes):
return obj
elif isinstance(obj, str):
return obj.encode("utf-8")
raise ValueError(f"{name} must be str or bytes")
def _test_error(status_code, message, vendor_code, sqlstate) -> Error:
cdef CAdbcError error
error.release = NULL
message = _to_bytes(message, "message")
error.message = message
if vendor_code:
error.vendor_code = vendor_code
else:
error.vendor_code = 0
if sqlstate:
sqlstate = sqlstate.encode("ascii")
else:
sqlstate = b"\0\0\0\0\0"
for i in range(5):
error.sqlstate[i] = sqlstate[i]
return check_error(AdbcStatusCode(status_code), &error)
cdef class _AdbcHandle:
"""
Base class for ADBC handles, which are context managers.
"""
cdef:
size_t _open_children
object _lock
str _child_type
def __init__(self, str child_type):
self._lock = threading.Lock()
self._child_type = child_type
def __enter__(self) -> "Self":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
cdef _open_child(self):
with self._lock:
self._open_children += 1
cdef _close_child(self):
with self._lock:
if self._open_children == 0:
raise RuntimeError(
f"Underflow in closing this {self._child_type}")
self._open_children -= 1
cdef _check_open_children(self):
with self._lock:
if self._open_children != 0:
raise RuntimeError(
f"Cannot close {self.__class__.__name__} "
f"with open {self._child_type}")
cdef void pycapsule_schema_deleter(object capsule) noexcept:
cdef CArrowSchema* allocated = <CArrowSchema*>PyCapsule_GetPointer(
capsule, "arrow_schema"
)
if allocated.release != NULL:
allocated.release(allocated)
Loading ...