Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
getdaft / catalog / __unity.py
Size: Mime:
"""WARNING! These APIs are internal; please use Catalog.from_unity() and Table.from_unity()."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

from daft.catalog import Catalog, Identifier, Table
from daft.io._deltalake import read_deltalake
from daft.unity_catalog import UnityCatalog as InnerCatalog  # noqa: TID253
from daft.unity_catalog import UnityCatalogTable as InnerTable  # noqa: TID253

if TYPE_CHECKING:
    from daft.dataframe import DataFrame


class UnityCatalog(Catalog):
    _inner: InnerCatalog

    def __init__(self, unity_catalog: InnerCatalog):
        """DEPRECATED: Please use `Catalog.from_unity`; version 0.5.0!"""
        warnings.warn(
            "This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Catalog.from_unity` instead; version 0.5.0!",
            category=DeprecationWarning,
        )
        self._inner = unity_catalog

    @property
    def name(self) -> str:
        # TODO feat: add names to unity catalogs
        return "unity"

    @staticmethod
    def _from_obj(obj: object) -> UnityCatalog:
        """Returns an UnityCatalog instance if the given object can be adapted so."""
        if isinstance(obj, InnerCatalog):
            c = UnityCatalog.__new__(UnityCatalog)
            c._inner = obj
            return c
        raise ValueError(f"Unsupported unity catalog type: {type(obj)}")

    ###
    # get_*
    ###

    def get_table(self, ident: Identifier | str) -> UnityTable:
        if isinstance(ident, Identifier):
            ident = ".".join(ident)  # TODO unity qualified identifiers
        return UnityTable(self._inner.load_table(ident))

    ###
    # list_.*
    ###

    def list_tables(self, pattern: str | None = None) -> list[str]:
        if pattern is None or pattern == "":
            return [
                tbl
                for cat in self._inner.list_catalogs()
                for schema in self._inner.list_schemas(cat)
                for tbl in self._inner.list_tables(schema)
            ]
        num_namespaces = pattern.count(".")
        if num_namespaces == 0:
            catalog_name = pattern
            return [tbl for schema in self._inner.list_schemas(catalog_name) for tbl in self._inner.list_tables(schema)]
        elif num_namespaces == 1:
            schema_name = pattern
            return [tbl for tbl in self._inner.list_tables(schema_name)]
        else:
            raise ValueError(
                f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {pattern}"
            )


class UnityTable(Table):
    _inner: InnerTable

    _read_options = {"version"}
    _write_options = {
        "schema_mode",
        "partition_col",
        "description",
        "configuration",
        "custom_metadata",
        "dynamo_table_name",
        "allow_unsafe_rename",
    }

    def __init__(self, unity_table: InnerTable):
        """DEPRECATED: Please use `Table.from_unity`; version 0.5.0!"""
        warnings.warn(
            "This is deprecated and will be removed in daft >= 0.5.0, please prefer using `Table.from_unity` instead; version 0.5.0!",
            category=DeprecationWarning,
        )
        self._inner = unity_table

    @property
    def name(self) -> str:
        return self._inner.table_info.name

    @staticmethod
    def _from_obj(obj: object) -> UnityTable | None:
        """Returns a UnityTable if the given object can be adapted so."""
        if isinstance(obj, InnerTable):
            t = UnityTable.__new__(UnityTable)
            t._inner = obj
            return t
        raise ValueError(f"Unsupported unity table type: {type(obj)}")

    @staticmethod
    def _try_from(obj: object) -> UnityTable | None:
        """Returns an UnityTable if the given object can be adapted so."""
        if isinstance(obj, InnerTable):
            return UnityTable(obj)
        return None

    ###
    # read methods
    ###

    def read(self, **options) -> DataFrame:
        Table._validate_options("Unity read", options, UnityTable._read_options)

        return read_deltalake(self._inner, version=options.get("version"))

    ###
    # write methods
    ###

    def write(self, df: DataFrame | object, mode: str = "append", **options):
        self._validate_options("Unity write", options, UnityTable._write_options)

        return df.write_deltalake(
            self._inner,
            mode=mode,
            schema_mode=options.get("schema_mode"),
            partition_cols=options.get("partition_cols"),
            description=options.get("description"),
            configuration=options.get("configuration"),
            custom_metadata=options.get("custom_metadata"),
            dynamo_table_name=options.get("dynamo_table_name"),
            allow_unsafe_rename=options.get("allow_unsafe_rename", False),
        )