Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
kiara / models / values / value.py
Size: Mime:
# -*- coding: utf-8 -*-

#  Copyright (c) 2021, University of Luxembourg / DHARPA project
#  Copyright (c) 2021, Markus Binsteiner
#
#  Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)

import abc
import atexit
import hashlib
import logging
import os
import tempfile
import uuid
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Dict,
    Generator,
    Iterable,
    List,
    Literal,
    Mapping,
    MutableMapping,
    Sequence,
    Set,
    Union,
)

import orjson
from humanfriendly import format_size
from multiformats import CID, multihash
from multiformats.multihash import Multihash
from multiformats.varint import BytesLike
from pydantic import BaseModel, ConfigDict, PrivateAttr, model_validator
from pydantic.fields import Field
from rich import box
from rich.console import Group, RenderableType
from rich.panel import Panel
from rich.rule import Rule
from rich.syntax import Syntax
from rich.table import Table

from kiara.defaults import (
    NO_MODULE_TYPE,
    NO_SERIALIZATION_MARKER,
    VOID_KIARA_ID,
    SpecialValue,
)
from kiara.exceptions import DataTypeUnknownException, InvalidValuesException
from kiara.models import KiaraModel
from kiara.models.module.manifest import InputsManifest, Manifest
from kiara.models.python_class import PythonClass
from kiara.models.values import DataTypeCharacteristics, ValueStatus
from kiara.models.values.value_schema import ValueSchema
from kiara.utils import is_jupyter, log_exception
from kiara.utils.dates import get_current_time_incl_timezone
from kiara.utils.hashing import create_cid_digest
from kiara.utils.json import orjson_dumps
from kiara.utils.yaml import StringYAML

log = logging.getLogger("kiara")
yaml = StringYAML()

if TYPE_CHECKING:
    from kiara.context import Kiara
    from kiara.data_types import DataType
    from kiara.interfaces.python_api.models.info import ValueInfo
    from kiara.models.values.lineage import ValueLineage
    from kiara.registries.data import DataRegistry


class SerializedChunks(BaseModel, abc.ABC):

    model_config = ConfigDict(extra="forbid")

    _size_cache: Union[int, None] = PrivateAttr(default=None)
    _hashes_cache: Dict[str, Sequence[CID]] = PrivateAttr(default_factory=dict)

    @abc.abstractmethod
    def get_chunks(
        self, as_files: bool = True, symlink_ok: bool = True
    ) -> Generator[Union[str, "BytesLike"], None, None]:
        """
        Retrieve the chunks belonging to this data instance.

        If 'as_file' is False, return the data as bytes. If set to 'True' store it to an arbitrary location (or use
        an existing one), and return the path to that file. If 'as_file' is a string, write the data (bytes) into
        a new file using the string as path. If 'symlink_ok' is set to True, symlinking an existing file to the value of
        'as_file' is also ok, otherwise copy the content.
        """

    @abc.abstractmethod
    def get_number_of_chunks(self) -> int:
        pass

    @abc.abstractmethod
    def _get_size(self) -> int:
        pass

    @abc.abstractmethod
    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        pass

    def get_size(self) -> int:

        if self._size_cache is None:
            self._size_cache = self._get_size()
        return self._size_cache

    def get_cids(self, hash_codec: str) -> Sequence[CID]:

        if self._hashes_cache.get(hash_codec, None) is None:
            self._hashes_cache[hash_codec] = self._create_cids(hash_codec=hash_codec)
        return self._hashes_cache[hash_codec]

    def _store_bytes_to_file(
        self, chunks: Iterable[bytes], file: Union[str, None] = None
    ) -> str:
        "Utility method to store bytes to a file."
        if file is None:
            file_desc, file = tempfile.mkstemp()

            def del_temp_file():
                os.remove(file)

            atexit.register(del_temp_file)

        else:
            if os.path.exists(file):
                raise Exception(f"Can't write to file, file exists: {file}")
            file_desc = os.open(file, 0o600)

        with os.fdopen(file_desc, "wb") as tmp:
            for chunk in chunks:
                tmp.write(chunk)

        return file

    def _read_bytes_from_file(self, file: str) -> bytes:

        with open(file, "rb") as f:
            content = f.read()

        return content


class SerializedPreStoreChunks(SerializedChunks):

    codec: str = Field(
        description="The codec used to encode the chunks in this model. Using the [multicodecs](https://github.com/multiformats/multicodec) codec table."
    )

    def _create_cid_from_chunk(self, chunk: bytes, hash_codec: str) -> CID:

        multihash = Multihash(codec=hash_codec)
        hash = multihash.digest(chunk)
        return create_cid_digest(digest=hash, codec=self.codec)

    def _create_cid_from_file(self, file: str, hash_codec: str) -> CID:

        assert hash_codec == "sha2-256"

        hash_func = hashlib.sha256
        file_hash = hash_func()

        CHUNK_SIZE = 65536
        with open(file, "rb") as f:
            fb = f.read(CHUNK_SIZE)
            while len(fb) > 0:
                file_hash.update(fb)
                fb = f.read(CHUNK_SIZE)

        wrapped = multihash.wrap(file_hash.digest(), "sha2-256")
        return create_cid_digest(digest=wrapped, codec=self.codec)


class SerializedBytes(SerializedPreStoreChunks):

    type: Literal["chunk"] = "chunk"
    chunk: bytes = Field(description="A byte-array")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:

        if as_files is False:
            yield self.chunk
        else:
            if as_files is True:
                file = None
            elif isinstance(as_files, str):
                file = as_files
            else:
                assert len(as_files) == 1
                file = as_files[0]
            path = self._store_bytes_to_file([self.chunk], file=file)
            yield path

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return len(self.chunk)

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_chunk(self.chunk, hash_codec=hash_codec)]


class SerializedListOfBytes(SerializedPreStoreChunks):

    type: Literal["chunks"] = "chunks"
    chunks: List[bytes] = Field(description="A list of byte arrays.")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:
        if as_files is False:
            for chunk in self.chunks:
                yield chunk
        else:
            if as_files is None or as_files is True or isinstance(as_files, str):
                # means we write all the chunks into one file
                file = None if as_files is True else as_files
                path = self._store_bytes_to_file(self.chunks, file=file)
                yield path
            else:
                assert len(as_files) == self.get_number_of_chunks()
                for idx, chunk in enumerate(self.chunks):
                    _file = as_files[idx]
                    path = self._store_bytes_to_file([chunk], file=_file)
                    yield path

    def get_number_of_chunks(self) -> int:
        return len(self.chunks)

    def _get_size(self) -> int:
        size = 0
        for chunk in self.chunks:
            size = size + len(chunk)
        return size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [
            self._create_cid_from_chunk(chunk, hash_codec=hash_codec)
            for chunk in self.chunks
        ]


class SerializedFile(SerializedPreStoreChunks):

    type: Literal["file"] = "file"
    file: str = Field(description="A path to a file containing the serialized data.")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:

        if as_files is False:
            chunk = self._read_bytes_from_file(self.file)
            yield chunk
        else:
            if as_files is True:
                yield self.file
            else:
                if isinstance(as_files, str):
                    file = as_files
                else:
                    assert len(as_files) == 1
                    file = as_files[0]
                if os.path.exists(file):
                    raise Exception(f"Can't write to file '{file}': file exists.")
                if symlink_ok:
                    os.symlink(self.file, file)
                    yield file
                else:
                    raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return os.path.getsize(os.path.realpath(self.file))

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_file(self.file, hash_codec=hash_codec)]


class SerializedFiles(SerializedPreStoreChunks):

    type: Literal["files"] = "files"
    files: List[str] = Field(
        description="A list of strings, pointing to files containing parts of the serialized data."
    )

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:
        raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return len(self.files)

    def _get_size(self) -> int:

        size = 0
        for file in self.files:
            size = size + os.path.getsize(os.path.realpath(file))
        return size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [
            self._create_cid_from_file(file, hash_codec=hash_codec)
            for file in self.files
        ]


class SerializedInlineJson(SerializedPreStoreChunks):

    type: Literal["inline-json"] = "inline-json"
    inline_data: Any = Field(
        None,
        description="Data that will not be stored externally, but inline in the containing model. This should only contain data types that can be serialized reliably using json (scalars, etc.).",
    )
    _json_cache: Union[bytes, None] = PrivateAttr(default=None)

    def as_json(self) -> bytes:
        assert self.inline_data is not None
        if self._json_cache is None:
            self._json_cache = orjson.dumps(
                self.inline_data,
                option=orjson.OPT_NON_STR_KEYS,
            )
        return self._json_cache

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:

        if as_files is False:
            yield self.as_json()
        else:
            raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return len(self.as_json())

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_chunk(self.as_json(), hash_codec=hash_codec)]


class SerializedChunkIDs(SerializedChunks):

    type: Literal["chunk-ids"] = "chunk-ids"
    chunk_id_list: List[str] = Field(
        description="A list of chunk ids, which will be resolved via the attached data registry."
    )
    archive_id: Union[uuid.UUID, None] = Field(
        None, description="The preferred data archive to get the chunks from."
    )
    size: int = Field(description="The size of all chunks combined.")
    _data_registry: "DataRegistry" = PrivateAttr(default=None)

    def get_chunks(
        self, as_files: bool = True, symlink_ok: bool = True
    ) -> Generator[Union[str, BytesLike], None, None]:
        """Retrieve the chunks of this value data.

        If 'as_files' is 'True', it will return strings representing paths to files containing the chunk data. If symlink_ok is also set to 'True', the returning Path could potentially be a symlink, which means the underlying function might not need to copy the file. In this case, you are responsible to not change the contents of the path, ever.

        If 'as_files' is 'False', BytesLike objects will be returned, containing the chunk data bytes directly.

        """

        chunk_ids = self.chunk_id_list
        return self._data_registry.retrieve_chunks(
            chunk_ids=chunk_ids,
            as_files=as_files,
            symlink_ok=symlink_ok,
            archive_id=self.archive_id,
        )

        # return (
        #     self._data_registry.retrieve_chunk(
        #         chunk_id=chunk,
        #         archive_id=self.archive_id,
        #         as_file=as_files,
        #         symlink_ok=symlink_ok,
        #     )
        #     for chunk in self.chunk_id_list
        # )

    def get_number_of_chunks(self) -> int:
        return len(self.chunk_id_list)

    def _get_size(self) -> int:
        return self.size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:

        result = []
        for chunk_id in self.chunk_id_list:
            cid = CID.decode(chunk_id)
            result.append(cid)

        return result


SERIALIZE_TYPES = {
    "chunk": SerializedBytes,
    "chunks": SerializedListOfBytes,
    "file": SerializedFile,
    "files": SerializedFiles,
    "inline-json": SerializedInlineJson,
    "chunk-ids": SerializedChunkIDs,
}


class SerializationMetadata(KiaraModel):

    _kiara_model_id: ClassVar = "metadata.serialized_data"

    environment: Mapping[str, int] = Field(
        description="Hash(es) for the environments the value was created/serialized.",
        default_factory=dict,
    )
    deserialize: Mapping[str, Manifest] = Field(
        description="Suggested manifest configs to use to de-serialize the data.",
        default_factory=dict,
    )


class SerializedData(KiaraModel):

    data_type: str = Field(
        description="The name of the data type for this serialized value."
    )
    data_type_config: Mapping[str, Any] = Field(
        description="The (optional) config for the data type for this serialized value.",
        default_factory=dict,
    )
    serialization_profile: str = Field(
        description="An identifying name for the serialization method used."
    )
    metadata: SerializationMetadata = Field(
        description="Optional metadata describing aspects of the serialization used.",
        default_factory=SerializationMetadata,
    )

    hash_codec: str = Field(
        description="The codec used to hash the value.", default="sha2-256"
    )
    _cids_cache: Dict[str, Sequence[CID]] = PrivateAttr(default_factory=dict)

    _cached_data_size: Union[int, None] = PrivateAttr(default=None)
    _cached_dag: Union[Dict[str, Sequence[CID]], None] = PrivateAttr(default=None)
    # _cached_cid: Optional[CID] = PrivateAttr(default=None)

    def _retrieve_data_to_hash(self) -> Any:

        return self.dag

    @property
    def data_size(self) -> int:
        if self._cached_data_size is not None:
            return self._cached_data_size

        size = 0
        for k in self.get_keys():
            model = self.get_serialized_data(k)
            size = size + model.get_size()
        self._cached_data_size = size
        return self._cached_data_size

    @abc.abstractmethod
    def get_keys(self) -> Iterable[str]:
        pass

    @abc.abstractmethod
    def get_serialized_data(self, key: str) -> SerializedChunks:
        pass

    def get_cids_for_key(self, key) -> Sequence[CID]:

        if key in self._cids_cache.keys():
            return self._cids_cache[key]

        model = self.get_serialized_data(key)
        self._cids_cache[key] = model.get_cids(hash_codec=self.hash_codec)
        return self._cids_cache[key]

    @property
    def dag(self) -> Mapping[str, Sequence[CID]]:

        if self._cached_dag is not None:
            return self._cached_dag

        dag: Dict[str, Sequence[CID]] = {}
        for key in self.get_keys():
            dag[key] = self.get_cids_for_key(key)

        self._cached_dag = dag
        return self._cached_dag


class SerializationResult(SerializedData):

    _kiara_model_id: ClassVar[str] = "instance.serialization_result"

    data: Dict[
        str,
        Union[
            SerializedBytes,
            SerializedListOfBytes,
            SerializedFile,
            SerializedFiles,
            SerializedInlineJson,
        ],
    ] = Field(
        description="One or several byte arrays representing the serialized state of the value."
    )

    def get_keys(self) -> Iterable[str]:
        return self.data.keys()

    def get_serialized_data(self, key: str) -> SerializedChunks:
        return self.data[key]

    @model_validator(mode="before")
    @classmethod
    def validate_data(cls, values):

        codec = values.get("codec", None)
        if codec is None:
            codec = "sha2-256"
            values["hash_codec"] = codec

        v = values.get("data")
        assert isinstance(v, Mapping)

        result = {}
        for field_name, data in v.items():
            if isinstance(data, SerializedChunks):
                result[field_name] = data
            elif isinstance(data, Mapping):
                s_type = data.get("type", None)
                if not s_type:
                    raise ValueError(
                        f"Invalid serialized data config, missing 'type' key: {data}"
                    )

                if s_type not in SERIALIZE_TYPES.keys():
                    raise ValueError(
                        f"Invalid serialized data type '{s_type}'. Allowed types: {', '.join(SERIALIZE_TYPES.keys())}"
                    )

                assert s_type != "chunk-ids"
                cls = SERIALIZE_TYPES[s_type]
                result[field_name] = cls(**data)

        values["data"] = result
        return values

    def create_renderable(self, **config: Any) -> RenderableType:

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("key")
        table.add_column("value")
        table.add_row("data_type", self.data_type)
        _config = Syntax(
            orjson_dumps(self.data_type_config), "json", background_color="default"
        )
        table.add_row("data_type_config", _config)

        data_fields = {}
        for field, model in self.data.items():
            data_fields[field] = {"type": model.type}
        data_json = Syntax(
            orjson_dumps(data_fields), "json", background_color="default"
        )
        table.add_row("data", data_json)
        table.add_row("size", str(self.data_size))
        table.add_row("hash", self.instance_id)

        return table

    def __repr__(self):

        return f"{self.__class__.__name__}(type={self.data_type} size={self.data_size})"

    def __str__(self):
        return self.__repr__()


class PersistedData(SerializedData):

    _kiara_model_id: ClassVar = "instance.persisted_data"

    archive_id: uuid.UUID = Field(
        description="The id of the store that persisted the data."
    )
    chunk_id_map: Mapping[str, SerializedChunkIDs] = Field(
        description="Reference-ids that resolve to the values' serialized chunks."
    )

    def get_keys(self) -> Iterable[str]:
        return self.chunk_id_map.keys()

    def get_serialized_data(self, key: str) -> SerializedChunks:
        return self.chunk_id_map[key]


class ValuePedigree(InputsManifest):

    _kiara_model_id: ClassVar = "instance.value_pedigree"

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context a value was created in."
    )
    environments: Dict[str, str] = Field(
        description="References to the runtime environment details a value was created in."
    )

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "manifest": self.manifest_cid,
            "inputs": self.inputs_cid,
            "environments": self.environments,
        }

    def __repr__(self):
        return f"ValuePedigree(module_type={self.module_type}, inputs=[{', '.join(self.inputs.keys())}], instance_id={self.instance_id})"

    def __str__(self):
        return self.__repr__()


class DataTypeInfo(KiaraModel):

    _kiara_model_id: ClassVar = "info.data_type_instance"

    data_type_name: str = Field(description="The registered name of this data type.")
    data_type_config: Mapping[str, Any] = Field(
        description="The (optional) configuration for this data type.",
        default_factory=dict,
    )
    characteristics: DataTypeCharacteristics = Field(
        description="Characteristics of this data type."
    )
    data_type_class: PythonClass = Field(
        description="The python class that is associated with this model."
    )
    _data_type_instance: "DataType" = PrivateAttr(default=None)

    @property
    def data_type_instance(self) -> "DataType":

        if self._data_type_instance is not None:
            return self._data_type_instance

        self._data_type_instance = self.data_type_class.get_class()(
            **self.data_type_config
        )
        return self._data_type_instance


class ValueDetails(KiaraModel):
    """A wrapper class that manages and retieves value data and its details."""

    _kiara_model_id: ClassVar = "instance.value_details"

    value_id: uuid.UUID = Field(description="The id of the value.")

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context this value belongs to."
    )

    value_schema: ValueSchema = Field(
        description="The schema that was used for this Value."
    )

    value_created: datetime = Field(
        description="The time when this value was created.",
        default_factory=get_current_time_incl_timezone,
    )
    value_status: ValueStatus = Field(description="The set/unset status of this value.")
    value_size: int = Field(description="The size of this value, in bytes.")
    value_hash: str = Field(description="The hash of this value.")
    pedigree: ValuePedigree = Field(
        description="Information about the module and inputs that went into creating this value."
    )
    pedigree_output_name: str = Field(
        description="The output name that produced this value (using the manifest inside the pedigree)."
    )
    data_type_info: DataTypeInfo = Field(
        description="Information about the data type this value is made of."
    )

    def _retrieve_id(self) -> str:
        return str(self.value_id)

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "value_type": self.value_schema.type,
            "value_hash": self.value_hash,
            "value_size": self.value_size,
        }

    @property
    def data_type_name(self) -> str:
        return self.data_type_info.data_type_name

    @property
    def data_type_config(self) -> Mapping[str, Any]:
        return self.data_type_info.data_type_config

    @property
    def is_optional(self) -> bool:
        return self.value_schema.optional

    @property
    def is_valid(self) -> bool:
        """Check whether the current value is valid."""
        if self.is_optional:
            return True
        else:
            return self.value_status == ValueStatus.SET

    @property
    def is_set(self) -> bool:
        return self.value_status in [ValueStatus.SET, ValueStatus.DEFAULT]

    @property
    def value_status_string(self) -> str:
        """Print a human readable short description of this values status."""
        if self.value_status == ValueStatus.DEFAULT:
            return "set (default)"
        elif self.value_status == ValueStatus.SET:
            return "set"
        elif self.value_status == ValueStatus.NONE:
            result = "no value"
        elif self.value_status == ValueStatus.NOT_SET:
            result = "not set"
        else:
            raise Exception(
                f"Invalid internal status of value '{self.value_id}'. This is most likely a bug."
            )

        if self.is_optional:
            result = f"{result} (not required)"
        return result

    def __repr__(self):

        return f"{self.__class__.__name__}(id={self.value_id}, type={self.data_type_name}, status={self.value_status.value})"

    def __str__(self):

        return self.__repr__()


class Value(ValueDetails):

    _kiara_model_id: ClassVar = "instance.value"

    _value_data: Any = PrivateAttr(default=SpecialValue.NOT_SET)
    _serialized_data: Union[None, str, SerializedData] = PrivateAttr(default=None)
    _data_retrieved: bool = PrivateAttr(default=False)
    _data_registry: "DataRegistry" = PrivateAttr(default=None)
    # _data_type: "DataType" = PrivateAttr(default=None)
    _is_stored: bool = PrivateAttr(default=False)
    _cached_properties: Union["ValueMap", None] = PrivateAttr(default=None)
    _lineage: Union["ValueLineage", None] = PrivateAttr(default=None)

    environment_hashes: Mapping[str, Mapping[str, str]] = Field(
        description="Hashes for the environments this value was created in."
    )
    # enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
    #     description="Information about the environments this value was created in.",
    #     default=None,
    # )
    property_links: Mapping[str, uuid.UUID] = Field(
        description="Links to values that are properties of this value.",
        default_factory=dict,
    )
    destiny_backlinks: Mapping[uuid.UUID, str] = Field(
        description="Backlinks to values that this value acts as destiny/or property for.",
        default_factory=dict,
    )
    job_id: Union[uuid.UUID, None] = Field(
        description="The id of the job that created this value (if applicable).",
        default=None,
    )

    def add_property(
        self,
        value_id: Union[uuid.UUID, "Value"],
        property_path: str,
        add_origin_to_property_value: bool = True,
    ):

        value = None
        try:
            value_temp = value
            value_id = value_id.value_id  # type: ignore
            value = value_temp
        except Exception:
            # in case a Value object was provided
            pass
        finally:
            del value_temp

        if add_origin_to_property_value:
            if value is None:
                value = self._data_registry.get_value(value=value_id)  # type: ignore

            if value._is_stored:
                raise Exception(
                    f"Can't add property to value '{self.value_id}': referenced value '{value.value_id}' already locked, so it's not possible to add the property backlink (as requested)."
                )

        assert value is not None

        if self._is_stored:
            raise Exception(
                f"Can't add property to value '{self.value_id}': value already locked."
            )

        if property_path in self.property_links.keys():
            raise Exception(
                f"Can't add property to value '{self.value_id}': property '{property_path}' already set."
            )

        self.property_links[property_path] = value_id  # type: ignore

        if add_origin_to_property_value:
            value.add_destiny_details(
                value_id=self.value_id, destiny_alias=property_path
            )

        self._cached_properties = None

    def add_destiny_details(self, value_id: uuid.UUID, destiny_alias: str):

        if self._is_stored:
            raise Exception(
                f"Can't set destiny_refs to value '{self.value_id}': value already locked."
            )

        self.destiny_backlinks[value_id] = destiny_alias  # type: ignore

    @property
    def is_serializable(self) -> bool:

        try:
            if self._serialized_data == NO_SERIALIZATION_MARKER:
                return False
            self.serialized_data
            return True
        except Exception:
            pass

        return False

    # @property
    # def data_type_class(self) -> "PythonClass":
    #     """Return the (Python) type of the underlying 'DataType' subclass."""
    #     return self.data_type_info.data_type_class

    @property
    def serialized_data(self) -> SerializedData:

        # if not self.is_set:
        #     raise Exception(f"Can't retrieve serialized data: value not set.")

        if self._serialized_data is not None:
            if isinstance(self._serialized_data, str):
                raise Exception(
                    f"Data type '{self.data_type_name}' does not support serializing: {self._serialized_data}"
                )

            return self._serialized_data

        self._serialized_data = self._data_registry.retrieve_persisted_value_details(
            self.value_id
        )
        return self._serialized_data

    @property
    def data(self) -> Any:
        if not self.is_initialized:
            raise Exception(
                f"Can't retrieve data for value '{self.value_id}': value not initialized yet. This is most likely a bug."
            )
        try:
            return self._retrieve_data()
        except DataTypeUnknownException as dtue:
            dtue._value = self
            raise dtue

    def _retrieve_data(self) -> Any:

        if self._value_data is not SpecialValue.NOT_SET:
            return self._value_data

        if self.value_status in [ValueStatus.NOT_SET, ValueStatus.NONE]:
            self._value_data = None
            return self._value_data
        elif self.value_status not in [ValueStatus.SET, ValueStatus.DEFAULT]:
            raise Exception(f"Invalid internal state of value '{self.value_id}'.")

        retrieved = self._data_registry.retrieve_value_data(value=self)

        if retrieved is None or isinstance(retrieved, SpecialValue):
            raise Exception(
                f"Can't set value data, invalid data type: {type(retrieved)}"
            )

        self._value_data = retrieved
        self._data_retrieved = True
        return self._value_data

    # def retrieve_load_config(self) -> Optional[LoadConfig]:
    #     return self._data_registry.retrieve_persisted_value_details(
    #         value_id=self.value_id
    #     )

    def __repr__(self):

        return f"{self.__class__.__name__}(id={self.value_id}, type={self.data_type_name}, status={self.value_status.value}, initialized={self.is_initialized} optional={self.value_schema.optional})"

    def _set_registry(self, data_registry: "DataRegistry") -> None:
        self._data_registry = data_registry

    @property
    def is_initialized(self) -> bool:
        result = not self.is_set or self._data_registry is not None
        return result

    @property
    def is_stored(self) -> bool:
        return self._is_stored

    @property
    def data_type(self) -> "DataType":

        return self.data_type_info.data_type_instance

    @property
    def lineage(self) -> "ValueLineage":
        if self._lineage is not None:
            return self._lineage

        from kiara.models.values.lineage import ValueLineage

        self._lineage = ValueLineage(kiara=self._data_registry._kiara, value=self)
        return self._lineage

    @property
    def property_values(self) -> "ValueMap":
        """Return a dictionary of all of this values properties."""

        if self._cached_properties is not None:
            return self._cached_properties

        self._cached_properties = self._data_registry.load_values(self.property_links)
        return self._cached_properties

    @property
    def property_names(self) -> Iterable[str]:
        return self.property_links.keys()

    def get_property_value(self, property_key) -> "Value":

        if property_key not in self.property_links.keys():
            raise Exception(
                f"Value '{self.value_id}' has no property with key '{property_key}."
            )

        return self._data_registry.get_value(self.property_links[property_key])

    def get_property_data(self, property_key: str) -> Any:

        try:
            return self.get_property_value(property_key=property_key).data
        except Exception as e:
            log_exception(e)
            return None

    def get_all_property_data(self, flatten_models: bool = False) -> Mapping[str, Any]:

        result = {k: self.get_property_data(k) for k in self.property_names}
        if not flatten_models:
            return result

        flat = {}
        for k, v in result.items():
            if hasattr(v, "model_dump"):
                flat[k] = v.model_dump()
            elif hasattr(v, "dict"):
                flat[k] = v.dict()
            else:
                flat[k] = v
        return flat

    def lookup_self_aliases(self) -> Set[str]:

        if not self._data_registry:
            raise Exception(
                f"Can't lookup aliases for value '{self.value_id}': data registry not set (yet)."
            )

        return self._data_registry.lookup_aliases(self)

    def create_info(self) -> "ValueInfo":

        if not self._data_registry:
            raise Exception(
                f"Can't create info object for value '{self.value_id}': data registry not set (yet)."
            )

        return self._data_registry.create_value_info(value=self.value_id)

    def create_info_data(self, **config: Any) -> Mapping[str, Any]:

        show_pedigree = config.get("show_pedigree", False)
        show_lineage = config.get("show_lineage", False)
        show_properties = config.get("show_properties", False)
        # show_destinies = config.get("show_destinies", False)
        # show_destiny_backlinks = config.get("show_destiny_backlinks", False)
        # show_data = config.get("show_data_preview", False)
        show_serialized = config.get("show_serialized", False)
        show_env_data_hashes = config.get("show_environment_hashes", False)
        show_env_data = config.get("show_environment_data", False)

        ignore_fields = config.get("ignore_fields", [])

        table: Dict[str, Any] = {}

        if "value_id" not in ignore_fields:
            table["value_id"] = self.value_id
        if "aliases" not in ignore_fields:
            if hasattr(self, "aliases"):
                table["aliases"] = self.aliases  # type: ignore

        if "kiara_id" not in ignore_fields:
            table["kiara_id"] = self.kiara_id

        for k in sorted(self.model_fields.keys()):

            if (
                k
                in [
                    "serialized",
                    "value_id",
                    "aliases",
                    "kiara_id",
                    "environments",
                    "lineage",
                    "environment_hashes",
                ]
                or k in ignore_fields
            ):
                continue

            attr = getattr(self, k)
            if k in ["pedigree_output_name", "pedigree"]:
                continue
            else:
                v = attr

            table[k] = v

        if show_pedigree:
            pedigree = getattr(self, "pedigree")

            table["pedigree"] = pedigree
            if pedigree == ORPHAN:
                pedigree_output_name: Union[Any, None] = None
            else:
                pedigree_output_name = getattr(self, "pedigree_output_name")

            table["pedigree_output_name"] = pedigree_output_name

        if show_lineage:
            table["lineage"] = self.lineage

        if show_serialized:
            serialized = self._data_registry.retrieve_persisted_value_details(
                self.value_id
            )
            table["serialized"] = serialized

        if show_env_data_hashes:
            env_hashes = Syntax(
                orjson_dumps(self.environment_hashes, option=orjson.OPT_INDENT_2),
                "json",
                background_color="default",
            )
            table["environment_hashes"] = env_hashes

        if show_env_data:
            raise NotImplementedError()

        if show_properties:
            if not self.property_links:
                table["properties"] = {}
            else:
                properties = self._data_registry.load_values(self.property_links)
                table["properties"] = properties

        # if hasattr(self, "destiny_links") and show_destinies:
        #     if not self.destiny_links:  # type: ignore
        #         table["destinies"] = {}
        #     else:
        #         destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
        #         table["destinies"] = destinies
        #
        # if show_destiny_backlinks:
        #     if not self.destiny_backlinks:
        #         table["destiny backlinks"] = {}
        #     else:
        #         destiny_items: List[Any] = []
        #         for v_id, alias in self.destiny_backlinks.items():
        #             destiny_items.append(
        #                 f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
        #             )
        #             rendered = self._data_registry.pretty_print_data(
        #                 value_id=v_id, **config
        #             )
        #             destiny_items.append(rendered)
        #         table["destiny backlinks"] = destiny_items
        #
        # if show_data:
        #     rendered = self._data_registry.pretty_print_data(
        #         self.value_id, target_type="terminal_renderable"
        #     )
        #     table["data preview"] = rendered

        return table

    def create_renderable(self, **render_config: Any) -> RenderableType:

        from kiara.utils.output import extract_renderable

        show_pedigree = render_config.get("show_pedigree", False)
        show_lineage = render_config.get("show_lineage", False)
        show_properties = render_config.get("show_properties", False)
        show_destinies = render_config.get("show_destinies", False)
        show_destiny_backlinks = render_config.get("show_destiny_backlinks", False)
        show_data = render_config.get("show_data_preview", False)
        show_serialized = render_config.get("show_serialized", False)
        show_env_data_hashes = render_config.get("show_environment_hashes", False)
        show_env_data = render_config.get("show_environment_data", False)

        ignore_fields = render_config.get("ignore_fields", [])

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("Key", style="i")
        table.add_column("Value")

        info_data = self.create_info_data(**render_config)

        if "value_id" not in ignore_fields:
            table.add_row("value_id", str(info_data["value_id"]))
        if "aliases" not in ignore_fields:
            if info_data.get("aliases", None):
                aliases_str = ", ".join(info_data["aliases"])  # type: ignore
                table.add_row("aliases", aliases_str)
            # else:
            #     aliases_str = "-- n/a --"
            #     table.add_row("aliases", aliases_str)

        if "kiara_id" not in ignore_fields:
            table.add_row("kiara_id", str(info_data["kiara_id"]))

        table.add_row("", "")
        table.add_row("", Rule())
        for k in sorted(info_data.keys()):

            if (
                k
                in [
                    "serialized",
                    "value_id",
                    "aliases",
                    "kiara_id",
                    "lineage",
                    "properties",
                    "environments",
                    "environment_hashes",
                ]
                or k in ignore_fields
            ):
                continue

            attr = info_data[k]
            if k in ["pedigree_output_name", "pedigree"]:
                continue

            elif k == "value_status":
                v: RenderableType = f"[i]-- {attr.value} --[/i]"
            elif k == "value_size":
                v = format_size(attr)
            else:
                v = extract_renderable(attr)

            table.add_row(k, v)

        if (
            show_pedigree
            or show_lineage
            or show_serialized
            or show_properties
            or show_destinies
            or show_destiny_backlinks
            or show_env_data_hashes
            or show_env_data
        ):
            table.add_row("", "")
            table.add_row("", Rule())
            table.add_row("", "")

        if show_pedigree:
            pedigree = info_data["pedigree"]

            if pedigree == ORPHAN:
                v = "[i]-- external data --[/i]"
                pedigree_output_name: Union[Any, None] = None
            else:
                v = extract_renderable(pedigree)
                pedigree_output_name = info_data["pedigree_output_name"]

            row = ["pedigree", v]
            table.add_row(*row)
            if pedigree_output_name:
                row = ["pedigree_output_name", pedigree_output_name]
                table.add_row(*row)

        if show_lineage:
            table.add_row(
                "lineage", info_data["lineage"].create_renderable(include_ids=True)
            )

        if show_serialized:
            serialized = info_data["serialized"]
            table.add_row("serialized", serialized.create_renderable())

        if show_env_data_hashes:
            env_hashes = Syntax(
                orjson_dumps(
                    info_data["environment_hashes"], option=orjson.OPT_INDENT_2
                ),
                "json",
                background_color="default",
            )
            table.add_row("environment_hashes", env_hashes)

        if show_env_data:
            raise NotImplementedError()

        if show_properties:
            if not info_data["properties"]:
                table.add_row("properties", "{}")
            else:
                properties = info_data["properties"]
                pr = properties.create_renderable(show_header=False)
                table.add_row("properties", pr)

        if hasattr(self, "destiny_links") and show_destinies:
            if not self.destiny_links:  # type: ignore
                table.add_row("destinies", "{}")
            else:
                destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
                dr = destinies.create_renderable(show_header=False)
                table.add_row("destinies", dr)

        if show_destiny_backlinks:
            if not self.destiny_backlinks:
                table.add_row("destiny backlinks", "{}")
            else:
                destiny_items: List[Any] = []
                for v_id, alias in self.destiny_backlinks.items():
                    destiny_items.append(Rule())
                    destiny_items.append(
                        f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
                    )
                    rendered = self._data_registry.pretty_print_data(
                        value_id=v_id, **render_config
                    )
                    destiny_items.append(rendered)
                table.add_row("destiny backlinks", Group(*destiny_items))

        if show_data:
            rendered = self._data_registry.pretty_print_data(
                self.value_id, target_type="terminal_renderable"
            )
            table.add_row("", "")
            table.add_row("", Rule())
            table.add_row("data preview", rendered)

        return table


class UnloadableData(KiaraModel):
    """
    A special 'marker' model, indicating that the data of value can't be loaded.

    In most cases, the reason this happens is because the current kiara context is missing some value types and/or modules.
    """

    _kiara_model_id: ClassVar = "instance.unloadable_data"

    value: Value = Field(description="A reference to the value.")

    def _retrieve_id(self) -> str:
        return self.value.instance_id

    def _retrieve_data_to_hash(self) -> Any:
        return self.value.value_id.bytes


class ValueMap(KiaraModel, MutableMapping[str, Value]):  # type: ignore

    values_schema: Dict[str, ValueSchema] = Field(
        description="The schemas for all the values in this set."
    )

    @property
    def field_names(self) -> Iterable[str]:
        return sorted(self.values_schema.keys())

    @abc.abstractmethod
    def get_value_obj(self, field_name: str) -> Value:
        pass

    @property
    def all_items_valid(self) -> bool:
        for field_name in self.values_schema.keys():
            item = self.get_value_obj(field_name)
            if not item.is_valid:
                return False
        return True

    def _retrieve_data_to_hash(self) -> Any:
        return {
            k: self.get_value_obj(k).instance_cid for k in self.values_schema.keys()
        }

    def check_invalid(self) -> Dict[str, str]:
        """Check whether the value set is invalid, if it is, return a description of what's wrong."""
        invalid: Dict[str, str] = {}
        for field_name in self.values_schema.keys():

            item = self.get_value_obj(field_name)
            field_schema = self.values_schema[field_name]
            if not field_schema.optional:
                msg: Union[str, None] = None
                if not item.value_status == ValueStatus.SET:

                    item_schema = self.values_schema[field_name]
                    if item_schema.is_required():

                        if not item.is_set:
                            msg = "not set"
                        elif item.value_status == ValueStatus.NONE:
                            msg = "no value"
                if msg:
                    invalid[field_name] = msg

        return invalid

    def get_value_data_for_fields(
        self, *field_names: str, raise_exception_when_unset: bool = False
    ) -> Dict[str, Any]:
        """
        Return the data for a one or several fields of this ValueMap.

        If a value is unset, by default 'None' is returned for it. Unless 'raise_exception_when_unset' is set to 'True',
        in which case an Exception will be raised (obviously).
        """
        if raise_exception_when_unset:
            unset: List[str] = []
            for k in field_names:
                v = self.get_value_obj(k)
                if not v.is_set:
                    if raise_exception_when_unset:
                        unset.append(k)
            if unset:
                raise Exception(
                    f"Can't get data for fields, one or several of the requested fields are not set yet: {', '.join(unset)}."
                )

        result: Dict[str, Any] = {}
        for k in field_names:
            v = self.get_value_obj(k)
            if not v.is_set:
                result[k] = None
            else:
                result[k] = v.data
        return result

    def get_value_data(
        self, field_name: str, raise_exception_when_unset: bool = False
    ) -> Any:
        return self.get_value_data_for_fields(
            field_name, raise_exception_when_unset=raise_exception_when_unset
        )[field_name]

    def get_all_value_ids(self) -> Dict[str, uuid.UUID]:
        return {k: self.get_value_obj(k).value_id for k in self.field_names}

    def get_all_value_data(
        self, raise_exception_when_unset: bool = False
    ) -> Dict[str, Any]:
        return self.get_value_data_for_fields(
            *self.field_names,
            raise_exception_when_unset=raise_exception_when_unset,
        )

    def set_values(self, **values) -> None:

        for k, v in values.items():
            self.set_value(k, v)

    def set_value(self, field_name: str, data: Any) -> None:
        raise Exception(
            f"The value set implementation '{self.__class__.__name__}' is read-only, and does not support the setting or changing of values."
        )

    def __getitem__(self, item: str) -> Value:

        return self.get_value_obj(item)

    def __setitem__(self, key: str, value):

        raise NotImplementedError()
        # self.set_value(key, value)

    def __delitem__(self, key: str):

        raise Exception(f"Removing items not supported: {key}")

    def __iter__(self):
        return iter(self.field_names)

    def __len__(self):
        return len(list(self.values_schema))

    def __repr__(self):
        return f"{self.__class__.__name__}(field_names={self.field_names})"

    def __str__(self):
        return self.__repr__()

    def create_invalid_renderable(self, **config) -> Union[RenderableType, None]:

        inv = self.check_invalid()
        if not inv:
            return None

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("field name", style="i")
        table.add_column("details", style="b red")

        for field, err in inv.items():
            table.add_row(field, err)

        return table

    def create_renderable(self, **config: Any) -> RenderableType:

        in_panel = config.get("in_panel", None)
        if in_panel is None:
            if is_jupyter():
                in_panel = True
            else:
                in_panel = False

        render_value_data = config.get("render_value_data", True)
        field_title = config.get("field_title", "field")
        value_title = config.get("value_title", "value")
        show_header = config.get("show_header", True)
        show_type = config.get("show_data_type", False)

        table = Table(show_lines=False, show_header=show_header, box=box.SIMPLE)
        table.add_column(field_title, style="b")
        if show_type:
            table.add_column("data_type")
        table.add_column(value_title, style="i")

        for field_name in self.field_names:

            value = self.get_value_obj(field_name=field_name)
            if render_value_data:
                rendered = value._data_registry.pretty_print_data(
                    value_id=value.value_id, target_type="terminal_renderable", **config
                )
            else:
                rendered = value.create_renderable(**config)

            if show_type:
                table.add_row(field_name, value.value_schema.type, rendered)
            else:
                table.add_row(field_name, rendered)

        if in_panel:
            return Panel(table)
        else:
            return table


class ValueMapReadOnly(ValueMap):  # type: ignore

    _kiara_model_id: ClassVar = "instance.value_map.readonly"

    @classmethod
    def create_from_ids(cls, data_registry: "DataRegistry", **value_ids: uuid.UUID):

        values = {k: data_registry.get_value(v) for k, v in value_ids.items()}
        values_schema = {k: v.value_schema for k, v in values.items()}
        return ValueMapReadOnly(value_items=values, values_schema=values_schema)

    @classmethod
    def create_from_values(cls, **values: Value) -> "ValueMapReadOnly":

        values_schema = {k: v.value_schema for k, v in values.items()}
        return ValueMapReadOnly(value_items=values, values_schema=values_schema)

    value_items: Dict[str, Value] = Field(
        description="The values contained in this set."
    )

    def get_value_obj(self, field_name: str) -> Value:

        if field_name not in self.value_items.keys():
            raise KeyError(
                f"Field '{field_name}' not available in value set. Available fields: {', '.join(self.field_names)}"
            )
        return self.value_items[field_name]


class ValueMapWritable(ValueMap):  # type: ignore

    _kiara_model_id: ClassVar = "instance.value_map.writeable"

    @classmethod
    def create_from_schema(
        cls,
        kiara: "Kiara",
        schema: Mapping[str, ValueSchema],
        pedigree: ValuePedigree,
        unique_value_ids: bool = False,
    ) -> "ValueMapWritable":

        v = ValueMapWritable(
            values_schema=dict(schema),
            pedigree=pedigree,
            unique_value_ids=unique_value_ids,
        )
        v._kiara = kiara
        v._data_registry = kiara.data_registry
        return v

    value_items: Dict[str, Value] = Field(
        description="The values contained in this set.", default_factory=dict
    )
    pedigree: ValuePedigree = Field(
        description="The pedigree to add to all of the result values."
    )
    unique_value_ids: bool = Field(
        description="Whether this value map always creates new value(id)s, even when a dataset with matching hash is found.",
        default=True,
    )

    _values_uncommitted: Dict[str, Any] = PrivateAttr(default_factory=dict)
    _kiara: "Kiara" = PrivateAttr(default=None)
    _data_registry: "DataRegistry" = PrivateAttr(default=None)
    _auto_commit: bool = PrivateAttr(default=True)

    def get_value_obj(self, field_name: str) -> Value:
        """
        Retrieve the value object for the specified field.

        This class only creates the actual value object the first time it is requested, because there is a potential
        cost to assembling it, and it might not be needed ever.
        """
        if field_name not in self.values_schema.keys():
            raise Exception(
                f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
            )

        if field_name in self.value_items.keys():
            return self.value_items[field_name]
        elif field_name not in self._values_uncommitted.keys():
            raise Exception(
                f"Can't retrieve value for field '{field_name}': value not set (yet)."
            )

        schema = self.values_schema[field_name]
        value_data = self._values_uncommitted[field_name]
        if isinstance(value_data, Value):
            value = value_data
        elif isinstance(value_data, uuid.UUID):
            value = self._data_registry.get_value(value_data)
        else:
            value = self._data_registry.register_data(
                data=value_data,
                schema=schema,
                pedigree=self.pedigree,
                pedigree_output_name=field_name,
                reuse_existing=not self.unique_value_ids,
            )

        self._values_uncommitted.pop(field_name)
        self.value_items[field_name] = value
        return self.value_items[field_name]

    def sync_values(self):

        for field_name in self.field_names:
            self.get_value_obj(field_name)

        invalid = self.check_invalid()
        if invalid:
            e = InvalidValuesException(invalid_values=invalid)
            try:
                raise e
            except Exception:
                # this is silly, I know
                log_exception(e)
                raise e

    def set_value(self, field_name: str, data: Any) -> None:
        """Set the value for the specified field."""
        if field_name not in self.field_names:
            raise Exception(
                f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
            )
        if self.value_items.get(field_name, False):
            raise Exception(
                f"Can't set data for field '{field_name}': field already committed."
            )
        if self._values_uncommitted.get(field_name, None) is not None:
            raise Exception(
                f"Can't set data for field '{field_name}': field already set."
            )

        self._values_uncommitted[field_name] = data
        if self._auto_commit:
            self.get_value_obj(field_name=field_name)


ValuePedigree.model_rebuild()
ORPHAN = ValuePedigree(
    kiara_id=VOID_KIARA_ID,
    environments={},
    module_type=NO_MODULE_TYPE,
    inputs={},
    is_resolved=True,
)
# GENESIS_PEDIGREE = None