Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-adbc-nightlies / adbc-driver-manager   python

Repository URL to install this package:

Version: 1.3.0 

/ _lib.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

"""Low-level ADBC API."""

import enum
import functools
import threading
import os
import typing
import sys
import warnings
from typing import List, Optional, Tuple

import cython
from cpython.bytes cimport PyBytes_FromStringAndSize
from cpython.pycapsule cimport (
    PyCapsule_GetPointer, PyCapsule_New, PyCapsule_CheckExact
)
from libc.stdint cimport int64_t, uint8_t, uint32_t, uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memset
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector

if typing.TYPE_CHECKING:
    from typing import Self  # no-cython-lint


class AdbcStatusCode(enum.IntEnum):
    """
    A status code indicating the type of error.
    """

    OK = ADBC_STATUS_OK
    UNKNOWN = ADBC_STATUS_UNKNOWN
    NOT_IMPLEMENTED = ADBC_STATUS_NOT_IMPLEMENTED
    NOT_FOUND = ADBC_STATUS_NOT_FOUND
    ALREADY_EXISTS = ADBC_STATUS_ALREADY_EXISTS
    INVALID_ARGUMENT = ADBC_STATUS_INVALID_ARGUMENT
    INVALID_STATE = ADBC_STATUS_INVALID_STATE
    INVALID_DATA = ADBC_STATUS_INVALID_DATA
    INTEGRITY = ADBC_STATUS_INTEGRITY
    INTERNAL = ADBC_STATUS_INTERNAL
    IO = ADBC_STATUS_IO
    CANCELLED = ADBC_STATUS_CANCELLED
    TIMEOUT = ADBC_STATUS_TIMEOUT
    UNAUTHENTICATED = ADBC_STATUS_UNAUTHENTICATED
    UNAUTHORIZED = ADBC_STATUS_UNAUTHORIZED


class AdbcInfoCode(enum.IntEnum):
    VENDOR_NAME = ADBC_INFO_VENDOR_NAME
    VENDOR_VERSION = ADBC_INFO_VENDOR_VERSION
    VENDOR_ARROW_VERSION = ADBC_INFO_VENDOR_ARROW_VERSION
    DRIVER_NAME = ADBC_INFO_DRIVER_NAME
    DRIVER_VERSION = ADBC_INFO_DRIVER_VERSION
    DRIVER_ARROW_VERSION = ADBC_INFO_DRIVER_ARROW_VERSION


class Warning(UserWarning):
    """
    PEP 249-compliant base warning class.
    """


class Error(Exception):
    """
    PEP 249-compliant base exception class.

    Attributes
    ----------
    status_code : AdbcStatusCode
        The original ADBC status code.
    vendor_code : int, optional
        A vendor-specific status code if present.
    sqlstate : str, optional
        The SQLSTATE code if present.
    details : list[tuple[str, bytes]], optional
        Additional error details, if present.
    """

    def __init__(
        self,
        message,
        *,
        status_code,
        vendor_code=None,
        sqlstate=None,
        details=None,
    ):
        super().__init__(message)
        self.status_code = AdbcStatusCode(status_code)
        self.vendor_code = vendor_code
        self.sqlstate = sqlstate
        self.details = details or []


class InterfaceError(Error):
    """Errors related to the database interface."""


class DatabaseError(Error):
    """Errors related to the database."""


class DataError(DatabaseError):
    """Errors related to processed data."""


class OperationalError(DatabaseError):
    """Errors related to database operation, not under user control."""


class IntegrityError(DatabaseError):
    """Errors related to relational integrity."""


class InternalError(DatabaseError):
    """Errors related to database-internal errors."""


class ProgrammingError(DatabaseError):
    """Errors related to user errors."""


class NotSupportedError(DatabaseError):
    """An operation or some functionality is not supported."""

    def __init__(self, message, *, vendor_code=None, sqlstate=None, details=None):
        super().__init__(
            message,
            status_code=AdbcStatusCode.NOT_IMPLEMENTED,
            vendor_code=vendor_code,
            sqlstate=sqlstate,
            details=details,
        )


# XXX: shorten the traceback a bit (and avoid exposing _lib).  We
# could also define the exceptions in __init__ but then we'd have a
# circular import situation
Error.__module__ = "adbc_driver_manager"
InterfaceError.__module__ = "adbc_driver_manager"
DatabaseError.__module__ = "adbc_driver_manager"
DataError.__module__ = "adbc_driver_manager"
OperationalError.__module__ = "adbc_driver_manager"
IntegrityError.__module__ = "adbc_driver_manager"
InternalError.__module__ = "adbc_driver_manager"
ProgrammingError.__module__ = "adbc_driver_manager"
NotSupportedError.__module__ = "adbc_driver_manager"


INGEST_OPTION_MODE = ADBC_INGEST_OPTION_MODE.decode("utf-8")
INGEST_OPTION_MODE_APPEND = ADBC_INGEST_OPTION_MODE_APPEND.decode("utf-8")
INGEST_OPTION_MODE_CREATE = ADBC_INGEST_OPTION_MODE_CREATE.decode("utf-8")
INGEST_OPTION_MODE_REPLACE = ADBC_INGEST_OPTION_MODE_REPLACE.decode("utf-8")
INGEST_OPTION_MODE_CREATE_APPEND = ADBC_INGEST_OPTION_MODE_CREATE_APPEND.decode("utf-8")
INGEST_OPTION_TARGET_TABLE = ADBC_INGEST_OPTION_TARGET_TABLE.decode("utf-8")


cdef object convert_error(CAdbcStatusCode status, CAdbcError* error):
    cdef CAdbcErrorDetail c_detail

    if status == ADBC_STATUS_OK:
        return None

    message = CAdbcStatusCodeMessage(status).decode("utf-8")
    vendor_code = None
    sqlstate = None
    details = []

    if error != NULL:
        if error.message != NULL:
            message += ": "
            message += error.message.decode("utf-8", "replace")
        if (
                error.vendor_code and
                error.vendor_code != ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
        ):
            vendor_code = error.vendor_code
            message += f". Vendor code: {vendor_code}"
        if error.sqlstate[0] != 0:
            sqlstate = bytes(error.sqlstate[i] for i in range(5))
            sqlstate = sqlstate.decode("ascii", "replace")
            message += f". SQLSTATE: {sqlstate}"

        num_details = AdbcErrorGetDetailCount(error)
        for index in range(num_details):
            c_detail = AdbcErrorGetDetail(error, index)
            if c_detail.key == NULL or c_detail.value == NULL:
                # Shouldn't happen...
                break
            details.append(
                (c_detail.key,
                 PyBytes_FromStringAndSize(
                     <const char*> c_detail.value,
                     c_detail.value_length)))

        if error.release:
            error.release(error)

    klass = Error
    if status in (ADBC_STATUS_INVALID_DATA,):
        klass = DataError
    elif status in (
        ADBC_STATUS_IO,
        ADBC_STATUS_CANCELLED,
        ADBC_STATUS_TIMEOUT,
        ADBC_STATUS_UNKNOWN,
    ):
        klass = OperationalError
    elif status in (ADBC_STATUS_INTEGRITY,):
        klass = IntegrityError
    elif status in (ADBC_STATUS_INTERNAL,):
        klass = InternalError
    elif status in (ADBC_STATUS_ALREADY_EXISTS,
                    ADBC_STATUS_INVALID_ARGUMENT,
                    ADBC_STATUS_INVALID_STATE,
                    ADBC_STATUS_NOT_FOUND,
                    ADBC_STATUS_UNAUTHENTICATED,
                    ADBC_STATUS_UNAUTHORIZED):
        klass = ProgrammingError
    elif status == ADBC_STATUS_NOT_IMPLEMENTED:
        return NotSupportedError(
            message,
            vendor_code=vendor_code,
            sqlstate=sqlstate,
            details=details,
        )
    return klass(
        message,
        status_code=status,
        vendor_code=vendor_code,
        sqlstate=sqlstate,
        details=details,
    )


cdef void check_error(CAdbcStatusCode status, CAdbcError* error) except *:
    if status == ADBC_STATUS_OK:
        return

    raise convert_error(status, error)


cdef CAdbcError empty_error():
    cdef CAdbcError error
    memset(&error, 0, cython.sizeof(error))
    # We always want the extended error info
    error.vendor_code = ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
    return error


cdef bytes _to_bytes(obj, str name):
    if isinstance(obj, bytes):
        return obj
    elif isinstance(obj, str):
        return obj.encode("utf-8")
    raise ValueError(f"{name} must be str or bytes")


def _test_error(status_code, message, vendor_code, sqlstate) -> Error:
    cdef CAdbcError error
    error.release = NULL

    message = _to_bytes(message, "message")
    error.message = message

    if vendor_code:
        error.vendor_code = vendor_code
    else:
        error.vendor_code = 0

    if sqlstate:
        sqlstate = sqlstate.encode("ascii")
    else:
        sqlstate = b"\0\0\0\0\0"
    for i in range(5):
        error.sqlstate[i] = sqlstate[i]

    return check_error(AdbcStatusCode(status_code), &error)


cdef class _AdbcHandle:
    """
    Base class for ADBC handles, which are context managers.
    """

    cdef:
        size_t _open_children
        object _lock
        str _child_type

    def __init__(self, str child_type):
        self._lock = threading.Lock()
        self._child_type = child_type

    def __enter__(self) -> "Self":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()

    cdef _open_child(self):
        with self._lock:
            self._open_children += 1

    cdef _close_child(self):
        with self._lock:
            if self._open_children == 0:
                raise RuntimeError(
                    f"Underflow in closing this {self._child_type}")
            self._open_children -= 1

    cdef _check_open_children(self):
        with self._lock:
            if self._open_children != 0:
                raise RuntimeError(
                    f"Cannot close {self.__class__.__name__} "
                    f"with open {self._child_type}")


cdef void pycapsule_schema_deleter(object capsule) noexcept:
    cdef CArrowSchema* allocated = <CArrowSchema*>PyCapsule_GetPointer(
        capsule, "arrow_schema"
    )
    if allocated.release != NULL:
        allocated.release(allocated)
Loading ...