Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING, ClassVar, Iterable, Mapping, Union

import structlog
from pydantic import Field

from kiara.models.documentation import DocumentationMetadataModel
from kiara.models.module.operation import (
    BaseOperationDetails,
    ManifestOperationConfig,
    OperationConfig,
)
from kiara.models.values.value_schema import ValueSchema
from kiara.modules.included_core_modules.create_from import CreateFromModule
from kiara.operations import OperationType
from kiara.utils import log_exception

if TYPE_CHECKING:
    from kiara.modules import KiaraModule

logger = structlog.getLogger()


class CreateValueFromDetails(BaseOperationDetails):

    source_type: str = Field(description="The type of the value to be created.")
    target_type: str = Field(description="The result type.")
    optional_args: Mapping[str, ValueSchema] = Field(description="Optional arguments.")

    # def retrieve_inputs_schema(self) -> ValueSetSchema:
    #
    #     result: Dict[str, Union[ValueSchema, Dict[str, Any]]] = {
    #         self.source_type: {"type": self.source_type, "doc": "The source value."},
    #     }
    #     for field, schema in self.optional_args.items():
    #         if field in result.keys():
    #             raise Exception(
    #                 f"Can't create 'create_from' operation '{self.source_type}' -> '{self.target_type}': duplicate input field '{field}'."
    #             )
    #         result[field] = schema
    #     return result
    #
    # def retrieve_outputs_schema(self) -> ValueSetSchema:
    #
    #     return {
    #         self.target_type: {"type": self.target_type, "doc": "The result value."}
    #     }


class CreateFromOperationType(OperationType[CreateValueFromDetails]):
    """Create a dataset from a dataset of another value.

    This operation always has at least one input named after the source type, and one output named after the target type. It can,
    in addition, also have other, optional inputs, to control how exactly the target value is created.
    """

    _operation_type_name: ClassVar[str] = "create_from"

    def _calculate_op_id(self, source_type: str, target_type: str):

        if source_type == "any":
            operation_id = f"create.{target_type}"
        else:
            operation_id = f"create.{target_type}.from.{source_type}"

        return operation_id

    def retrieve_included_operation_configs(
        self,
    ) -> Iterable[Union[Mapping, OperationConfig]]:

        result = {}
        for name, module_cls in self._kiara.module_type_classes.items():
            if not hasattr(module_cls, "retrieve_supported_create_combinations"):
                continue

            try:
                supported_combinations = module_cls.retrieve_supported_create_combinations()  # type: ignore
                for sup_comb in supported_combinations:
                    source_type = sup_comb["source_type"]
                    target_type = sup_comb["target_type"]
                    func = sup_comb["func"]

                    if source_type not in self._kiara.data_type_names:
                        logger.debug(
                            "ignore.operation_config",
                            module_type=name,
                            reason=f"Source type '{source_type}' not registered.",
                        )
                        continue
                    if target_type not in self._kiara.data_type_names:
                        logger.debug(
                            "ignore.operation_config",
                            module_type=name,
                            reason=f"Target type '{target_type}' not registered.",
                        )
                        continue
                    if not hasattr(module_cls, func):
                        logger.debug(
                            "ignore.operation_config",
                            module_type=name,
                            reason=f"Specified create function '{func}' not available.",
                        )
                        continue

                    mc = {"source_type": source_type, "target_type": target_type}
                    # TODO: check whether module config actually supports those, for now, only 'CreateFromModule' subtypes are supported
                    _func = getattr(module_cls, func)
                    doc = DocumentationMetadataModel.from_function(_func)

                    oc = ManifestOperationConfig(
                        module_type=name, module_config=mc, doc=doc
                    )
                    op_id = self._calculate_op_id(
                        source_type=source_type, target_type=target_type
                    )
                    result[op_id] = oc
            except Exception as e:
                log_exception(e)
                logger.debug(
                    "ignore.create_operation_instance", module_type=name, reason=e
                )
                continue

        return result.values()

    def check_matching_operation(
        self, module: "KiaraModule"
    ) -> Union[CreateValueFromDetails, None]:

        if not isinstance(module, CreateFromModule):
            return None

        source_type = None
        for field_name, schema in module.inputs_schema.items():
            if field_name == schema.type:
                if source_type is not None:
                    logger.debug(
                        "ignore.operation",
                        operation_type="create_from",
                        reason=f"more than one possible target type field: {field_name}",
                    )
                    return None
                source_type = field_name

        if source_type is None:
            return None

        target_type = None
        for field_name, schema in module.outputs_schema.items():
            if field_name == schema.type:
                if target_type is not None:
                    logger.debug(
                        "ignore.operation",
                        operation_type="create_from",
                        reason=f"more than one possible target type field: {field_name}",
                    )
                    return None
                target_type = field_name

        if target_type is None:
            return None

        op_id = self._calculate_op_id(source_type=source_type, target_type=target_type)

        if (
            "any" in self._kiara.type_registry.get_type_lineage(target_type)
            and target_type != "any"
        ):
            is_internal = False
        else:
            is_internal = True

        optional = {}
        for field, schema in module.inputs_schema.items():
            if field == source_type:
                continue
            optional[field] = schema

        details = {
            "module_inputs_schema": module.inputs_schema,
            "module_outputs_schema": module.outputs_schema,
            "operation_id": op_id,
            "source_type": source_type,
            "target_type": target_type,
            "optional_args": optional,
            "is_internal_operation": is_internal,
        }

        result: CreateValueFromDetails = (
            CreateValueFromDetails.create_operation_details(**details)
        )
        return result