Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
kiara / registries / __init__.py
Size: Mime:
# -*- coding: utf-8 -*-

#  Copyright (c) 2021, University of Luxembourg / DHARPA project
#  Copyright (c) 2021, Markus Binsteiner
#
#  Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)

import abc
import os
import uuid
from enum import Enum
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generic,
    Iterable,
    Mapping,
    Type,
    TypeVar,
    Union,
)

import structlog
from pydantic import BaseModel, ConfigDict, Field, RootModel

from kiara.utils import log_message

try:
    from typing import Literal  # type: ignore
except ImportError:
    from typing_extensions import Literal  # type: ignore
try:
    from typing import Self  # type: ignore
except ImportError:
    from typing_extensions import Self  # type: ignore

if TYPE_CHECKING:
    from kiara.context import Kiara


class ArchiveConfig(BaseModel, abc.ABC):
    @classmethod
    @abc.abstractmethod
    def create_new_store_config(cls, store_base_path: str, **kwargs) -> Self:
        raise NotImplementedError(
            f"Store config type '{cls}' does not implement 'create_new_config'."
        )

    model_config = ConfigDict()

    # @abc.abstractmethod
    # def get_archive_id(self) -> uuid.UUID:
    #     raise NotImplementedError(
    #         f"Store config type '{self.__class__.__name__}' does not implement 'get_archive_id'."
    #     )


ARCHIVE_CONFIG_CLS = TypeVar("ARCHIVE_CONFIG_CLS", bound=ArchiveConfig)


logger = structlog.getLogger()


class ArchiveDetails(BaseModel):

    size: Union[int, None] = Field(
        description="The size of the stored archive.", default=None
    )


class ArchiveMetadata(RootModel):
    root: Mapping[str, Any]

    def __iter__(self):
        return iter(self.root)

    def __getitem__(self, item):
        return self.root[item]

    def __setitem__(self, key, value):
        self.root[key] = value

    def get(self, key, default=None):
        return self.root.get(key, default)

    # archive_id: Union[uuid.UUID, None] = Field(
    #     description="The id of the stored archive.", default=None
    # )
    # custom_metadata: Dict[str, Any] = Field(
    #     description="Custom metadata for the archive.", default_factory=dict
    # )


NON_ARCHIVE_DETAILS = ArchiveDetails()


class KiaraArchive(abc.ABC, Generic[ARCHIVE_CONFIG_CLS]):

    _config_cls: Type[ARCHIVE_CONFIG_CLS] = None  # type: ignore

    # @classmethod
    # def create_store_config_instance(
    #     cls, config: Union[ARCHIVE_CONFIG_CLS, BaseModel, Mapping[str, Any]]
    # ) -> "BaseArchive":
    #     """Create a store config instance from a config instance of a few different types."""
    #
    #     from kiara.context.config import KiaraArchiveConfig
    #
    #     if isinstance(config, cls._config_cls):
    #         config = config
    #     elif isinstance(config, KiaraArchiveConfig):
    #         config = cls._config_cls(**config.config)
    #     elif isinstance(config, BaseModel):
    #         config = cls._config_cls(**config.model_dump())
    #     elif isinstance(config, Mapping):
    #         config = cls._config_cls(**config)
    #
    #     return config

    # @classmethod
    # def is_valid_archive(cls, store_uri: str, **kwargs: Any) -> bool:
    #     return False

    @classmethod
    def _load_archive_config(
        cls, archive_uri: str, allow_write_access: bool, **kwargs
    ) -> Union[Dict[str, Any], None]:
        """Tries to assemble an archive config from an uri (and optional paramters).

        If the archive type supports the archive at the uri, then a valid config will be returned,
        otherwise 'None'.
        """

        return None

    @classmethod
    def load_archive_config(
        cls, archive_uri: str, allow_write_access: bool, **kwargs
    ) -> Union[Dict[str, Any], None]:

        log_message(
            "attempt_loading_existing_store",
            archive_uri=archive_uri,
            archive_type=cls.__name__,
        )

        return cls._load_archive_config(
            archive_uri=archive_uri, allow_write_access=allow_write_access, **kwargs
        )

    @classmethod
    def create_new_store_config(
        cls, store_base_path: str, **kwargs
    ) -> ARCHIVE_CONFIG_CLS:

        log_message(
            "create_new_store",
            store_base_path=store_base_path,
            store_type=cls.__name__,
        )

        Path(store_base_path).mkdir(parents=True, exist_ok=True)

        archive_config: ARCHIVE_CONFIG_CLS = cls._config_cls.create_new_store_config(
            store_base_path=store_base_path, **kwargs
        )
        return archive_config

    def __init__(
        self,
        archive_config: ARCHIVE_CONFIG_CLS,
        force_read_only: bool = False,
        archive_alias: Union[str, None] = None,
    ):

        self._archive_alias: Union[str, None] = archive_alias
        self._config: ARCHIVE_CONFIG_CLS = archive_config
        self._force_read_only: bool = force_read_only

        self._archive_metadata: Union[ArchiveMetadata, None] = None

    @property
    def archive_metadata(self) -> ArchiveMetadata:

        if self._archive_metadata is None:
            archive_metadata = self._retrieve_archive_metadata()
            self._archive_metadata = ArchiveMetadata(root=archive_metadata)

        return self._archive_metadata

    @classmethod
    @abc.abstractmethod
    def supported_item_types(cls) -> Iterable[str]:
        pass

    @classmethod
    @abc.abstractmethod
    def _is_writeable(cls) -> bool:
        pass

    @abc.abstractmethod
    def register_archive(self, kiara: "Kiara"):
        pass

    @abc.abstractmethod
    def _retrieve_archive_metadata(self) -> Mapping[str, Any]:
        """Retrieve metadata for the archive.

        Must contain at least one key 'archive_id', with a uuid-able value that
        uniquely identifies the archive.
        """

        raise NotImplementedError()

    def get_archive_metadata(self, key: str) -> Any:

        return self.archive_metadata.get(key, None)

    def set_archive_metadata_value(self, key: str, value: Any):

        if not self.is_writeable():
            raise Exception("Can't set metadata on read-only archive.")

        self._set_archive_metadata_value(key, value)
        self.archive_metadata[key] = value

    def _set_archive_metadata_value(self, key: str, value: Any):
        """Set custom metadata for the archive."""

        raise NotImplementedError(
            f"This archive type '{type(self.__class__)}' does not support setting metadata."
        )

    @property
    def archive_alias(self) -> str:
        if self._archive_alias:
            return self._archive_alias

        alias = self.get_archive_metadata("archive_alias")
        if not alias:
            alias = str(self.archive_id)
        self._archive_alias = alias
        return self._archive_alias  # type: ignore

    def is_force_read_only(self) -> bool:
        return self._force_read_only

    def set_force_read_only(self, force_read_only: bool):
        self._force_read_only = force_read_only

    def is_writeable(self) -> bool:
        if self._force_read_only:
            return False
        return self.__class__._is_writeable()

    # @abc.abstractmethod
    # def register_archive(self, kiara: "Kiara"):
    #     pass

    @property
    def archive_id(self) -> uuid.UUID:

        try:
            result = self.archive_metadata["archive_id"]
        except KeyError:
            raise Exception("Archive does not have an id metadata value set.")

        return uuid.UUID(result)

    @property
    def config(self) -> ARCHIVE_CONFIG_CLS:
        return self._config

    def get_archive_details(self) -> ArchiveDetails:
        return NON_ARCHIVE_DETAILS

    def delete_archive(self, archive_id: Union[uuid.UUID, None] = None):

        if archive_id != self.archive_id:
            raise Exception(
                f"Not deleting archive with id '{self.archive_id}': confirmation id '{archive_id}' does not match."
            )

        logger.info(
            "deleteing.archive",
            archive_id=self.archive_id,
            item_types=self.supported_item_types(),
            archive_type=self.__class__.__name__,
        )
        self._delete_archive()

    @abc.abstractmethod
    def _delete_archive(self):
        pass

    def __hash__(self):

        return hash(self.__class__) + hash(self.archive_id)

    def __eq__(self, other):

        if self.__class__ != other.__class__:
            return False

        return self.archive_id == other.archive_id


class BaseArchive(KiaraArchive[ARCHIVE_CONFIG_CLS], Generic[ARCHIVE_CONFIG_CLS]):
    """A base class that can be used to implement a kiara archive."""

    def __init__(
        self,
        archive_alias: str,
        archive_config: ARCHIVE_CONFIG_CLS,
        force_read_only: bool = False,
    ):

        super().__init__(
            archive_alias=archive_alias,
            archive_config=archive_config,
            force_read_only=force_read_only,
        )
        self._kiara: Union["Kiara", None] = None

    @classmethod
    def _is_writeable(cls) -> bool:
        return False

    @property
    def kiara_context(self) -> "Kiara":
        if self._kiara is None:
            raise Exception("Archive not registered into a kiara context yet.")
        return self._kiara

    def register_archive(self, kiara: "Kiara"):
        if self._kiara is not None:
            raise Exception("Archive already registered in a context.")
        self._kiara = kiara

    def _delete_archive(self):

        logger.info(
            "ignore.archive_delete_request",
            reason="not implemented/applicable",
            archive_id=self.archive_id,
            item_types=self.supported_item_types(),
            archive_type=self.__class__.__name__,
        )


class FileSystemArchiveConfig(ArchiveConfig):
    @classmethod
    def load_store_config(cls, store_uri: str, **kwargs) -> Self:
        raise NotImplementedError(
            f"Store config type '{cls}' does not implement 'create_config'."
        )

    @classmethod
    def create_new_store_config(
        cls, store_base_path: str, **kwargs
    ) -> "FileSystemArchiveConfig":

        store_id = str(uuid.uuid4())
        if "path" in kwargs:
            file_name = kwargs["path"]
        else:
            file_name = store_id

        archive_path = os.path.abspath(os.path.join(store_base_path, file_name))

        return FileSystemArchiveConfig(archive_path=archive_path)

    archive_path: str = Field(
        description="The path where the data for this archive is stored."
    )


class SqliteArchiveConfig(ArchiveConfig):
    @classmethod
    def create_new_store_config(
        cls, store_base_path: str, **kwargs
    ) -> "SqliteArchiveConfig":

        store_id = str(uuid.uuid4())

        if "file_name" in kwargs:
            file_name = kwargs["file_name"]
        else:
            file_name = f"{store_id}.sqlite"

        archive_path = os.path.abspath(os.path.join(store_base_path, file_name))

        import sqlite3

        if not os.path.exists(archive_path):
            Path(archive_path).parent.mkdir(exist_ok=True, parents=True)

        conn = sqlite3.connect(archive_path)

        # Create a cursor object
        c = conn.cursor()

        # Create table
        c.execute(
            """CREATE TABLE IF NOT EXISTS archive_metadata
                     (key text PRIMARY KEY , value text NOT NULL)"""
        )

        # Insert a row of data
        c.execute(
            "INSERT OR IGNORE INTO archive_metadata VALUES ('archive_id', ?)",
            (store_id,),
        )

        # Save (commit) the changes
        conn.commit()

        # Close the connection
        conn.close()

        return SqliteArchiveConfig(sqlite_db_path=archive_path)

    sqlite_db_path: str = Field(
        description="The path where the data for this archive is stored."
    )


class CHUNK_COMPRESSION_TYPE(Enum):
    NONE = 0
    ZSTD = 1
    LZMA = 2
    LZ4 = 3


DEFAULT_CHUNK_COMPRESSION = "zstd"


class SqliteDataStoreConfig(SqliteArchiveConfig):
    @classmethod
    def create_new_store_config(
        cls, store_base_path: str, **kwargs
    ) -> "SqliteDataStoreConfig":

        store_id = str(uuid.uuid4())

        if "file_name" in kwargs:
            file_name = kwargs["file_name"]
        else:
            file_name = f"{store_id}.sqlite"

        default_chunk_compression = kwargs.get(
            "default_chunk_compression", DEFAULT_CHUNK_COMPRESSION
        )

        archive_path = os.path.abspath(os.path.join(store_base_path, file_name))

        if os.path.exists(archive_path):
            raise Exception(f"Archive path '{archive_path}' already exists.")

        Path(archive_path).parent.mkdir(exist_ok=True, parents=True)

        # Connect to the SQLite database (or create it if it doesn't exist)
        import sqlite3

        conn = sqlite3.connect(archive_path)

        # Create a cursor object
        c = conn.cursor()

        # Create table
        c.execute(
            """CREATE TABLE archive_metadata
                     (key text PRIMARY KEY , value text NOT NULL)"""
        )

        # Insert a row of data
        c.execute("INSERT INTO archive_metadata VALUES ('archive_id', ?)", (store_id,))

        # Save (commit) the changes
        conn.commit()

        # Close the connection
        conn.close()

        return SqliteDataStoreConfig(
            sqlite_db_path=archive_path,
            default_chunk_compression=default_chunk_compression,
        )

    default_chunk_compression: Literal["none", "lz4", "zstd", "lzma"] = Field(  # type: ignore
        description="The default compression type to use for data in this store.",
        default=DEFAULT_CHUNK_COMPRESSION,
    )