Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
kiara / utils / cli / run.py
Size: Mime:
# -*- coding: utf-8 -*-
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, Iterable, List, Mapping, Union

from click import Context as ClickContext
from pydantic import ValidationError
from rich.console import Group, RenderableType
from rich.markdown import Markdown
from rich.rule import Rule

from kiara.api import KiaraAPI, ValueMap
from kiara.exceptions import (
    FailedJobException,
    InvalidCommandLineInvocation,
    KiaraException,
    NoSuchExecutionTargetException,
)
from kiara.interfaces.python_api.utils import create_save_config
from kiara.models.module.operation import Operation

# from kiara.interfaces.python_api.operation import KiaraOperation
from kiara.utils import log_exception
from kiara.utils.cli import dict_from_cli_args, terminal_print, terminal_print_model
from kiara.utils.cli.rich_click import rich_format_operation_help
from kiara.utils.operations import create_operation_status_renderable
from kiara.utils.output import create_table_from_base_model_cls


def _validate_save_option(save: Iterable[str]) -> bool:

    if save:
        for a in save:
            if "=" in a:
                tokens = a.split("=")
                if len(tokens) != 2:
                    raise InvalidCommandLineInvocation(
                        msg=f"Invalid alias format, can only contain a single '=': {a}",
                        error_code=1,
                    )

        return True
    else:
        return False


def validate_operation_in_terminal(
    api: KiaraAPI,
    module_or_operation: Union[str, Path, Mapping[str, Any]],
    allow_external=False,
) -> Operation:

    # kiara_op = KiaraOperation(
    #     kiara=kiara,
    #     operation_name=module_or_operation,
    #     operation_config=module_config,
    # )
    try:
        operation = api.get_operation(operation=module_or_operation)
        # validate that operation config is valid, ignoring inputs for now
        # kiara_op.operation
    except NoSuchExecutionTargetException as nset:

        terminal_print()
        terminal_print(nset)
        terminal_print()
        terminal_print("Existing operations:")
        terminal_print()
        for n in nset.avaliable_targets:
            terminal_print(f"  - [i]{n}[/i]")
        raise InvalidCommandLineInvocation("No such target.", parent=nset, error_code=1)
    except ValidationError as ve:

        renderables: List[RenderableType] = [""]
        renderables.append("Invalid module configuration:")
        renderables.append("")
        for error in ve.errors():
            loc = ", ".join(error["loc"])  # type: ignore
            renderables.append(f"  [b]{loc}[/b]: [red]{error['msg']}[/red]")

        try:
            if isinstance(module_or_operation, str):
                m = api.context.module_registry.get_module_class(module_or_operation)
                schema = create_table_from_base_model_cls(m._config_cls)
                renderables.append("")
                renderables.append(f"Module configuration schema for '[b i]{m._module_type_name}[/b i]':")  # type: ignore
                renderables.append("")
                renderables.append(schema)
        except Exception:
            pass

        msg = Group(*renderables)
        terminal_print()
        terminal_print(msg, in_panel="[b red]Module configuration error[/b red]")
        raise InvalidCommandLineInvocation(
            "Invalid module config.", parent=ve, error_code=1
        )

    except Exception as e:
        log_exception(e)
        terminal_print()
        terminal_print(
            f"Error when trying to validate the operation [i]'{module_or_operation}'[/i]:\n"
        )
        terminal_print(f"    [red]{e}[/red]")
        root_cause = KiaraException.get_root_details(e)
        if root_cause:
            terminal_print()
            terminal_print(Markdown(root_cause))
        raise InvalidCommandLineInvocation(
            "Can't validate operation.", parent=e, error_code=1
        )

    return operation


def calculate_aliases(
    operation: Operation,
    alias_tokens: Iterable[str],
    extra_aliases: Union[Mapping[str, str], None] = None,
) -> Mapping[str, List[str]]:

    if not alias_tokens:
        aliases: Dict[str, List[str]] = {}
        full_aliases: List[str] = []
    else:
        aliases = {}
        full_aliases = []
        for a in alias_tokens:
            if "=" not in a:
                full_aliases.append(a)
            else:
                tokens = a.split("=")
                if len(tokens) != 2:
                    terminal_print()
                    terminal_print(
                        f"Invalid alias format, can only contain a single '=': {a}"
                    )
                    sys.exit(1)

                aliases.setdefault(tokens[0], []).append(tokens[1])

    # =========================================================================
    # check save user input
    final_aliases: Dict[str, List[str]] = {}
    if alias_tokens:
        op_output_names = operation.outputs_schema.keys()
        invalid_fields = []
        for field_name, alias in aliases.items():
            if field_name not in op_output_names:
                invalid_fields.append(field_name)
            else:
                final_aliases[field_name] = alias

        for _alias in full_aliases:
            for field_name in op_output_names:
                final_aliases.setdefault(field_name, []).append(
                    f"{_alias}.{field_name}"
                )

        if invalid_fields:
            terminal_print()
            terminal_print(
                f"Can't run workflow, invalid field name(s) when specifying aliases: {', '.join(invalid_fields)}. Valid field names: {', '.join(op_output_names)}"
            )
            sys.exit(1)

    if extra_aliases:
        op_output_names = operation.outputs_schema.keys()
        invalid_fields = []
        for field_name, _alias in extra_aliases.items():
            if field_name not in op_output_names:
                invalid_fields.append(field_name)
            elif _alias not in final_aliases:
                final_aliases.setdefault(field_name, []).append(_alias)

        if invalid_fields:
            terminal_print()
            terminal_print(
                f"Can't run workflow, invalid field name(s) in extra save aliases: {', '.join(invalid_fields)}. Valid field names: {', '.join(op_output_names)}"
            )
            sys.exit(1)

    return final_aliases


def set_and_validate_inputs(
    api: KiaraAPI,
    operation: Operation,
    inputs: Iterable[str],
    explain: bool,
    print_help: bool,
    click_context: ClickContext,
    cmd_help: str,
    base_inputs: Union[None, Mapping[str, Any]] = None,
) -> Union[ValueMap, None]:

    # =========================================================================
    # prepare inputs
    list_keys = []
    for (
        name,
        value_schema,
    ) in operation.operation_details.inputs_schema.items():
        if value_schema.type in ["list", "kiara_model_list"]:
            list_keys.append(name)

    try:
        inputs_dict: Dict[str, Any] = dict_from_cli_args(*inputs, list_keys=list_keys)
        if base_inputs:
            for k, v in base_inputs.items():
                if k not in inputs_dict.keys():
                    inputs_dict[k] = v

        value_map = api.assemble_value_map(
            values=inputs_dict,
            values_schema=operation.inputs_schema,
            register_data=True,
            reuse_existing_data=False,
        )
    except Exception as e:
        log_exception(e)
        terminal_print()
        rg = Group(
            "",
            f"Can't run operation: {e}",
            "",
            Rule(),
            "",
            create_operation_status_renderable(
                operation=operation,
                inputs=None,
                render_config={
                    "show_operation_name": True,
                    "show_inputs": False,
                    "show_outputs_schema": True,
                },
            ),
        )
        terminal_print(rg, in_panel=f"Run info: [b]{operation.operation_id}[/b]")
        sys.exit(1)

    if value_map.check_invalid():
        terminal_print()
        rg = Group(
            "",
            "Can't run operation: invalid or insufficient input(s)",
            "",
            Rule(),
            "",
            create_operation_status_renderable(
                operation=operation,
                inputs=value_map,
                render_config={
                    "show_operation_name": True,
                    "show_inputs": True,
                    "show_outputs_schema": True,
                },
            ),
        )
        terminal_print(rg, in_panel=f"Run info: [b]{operation.operation_id}[/b]")
        raise InvalidCommandLineInvocation(
            msg="Invalid or insufficient input(s)", error_code=1
        )

    if print_help:
        rich_format_operation_help(
            obj=click_context.command,
            ctx=click_context,
            operation=operation,
            op_inputs=value_map,
            cmd_help=cmd_help,
        )
        return None

    if explain:
        terminal_print()
        rg = Group(
            "",
            create_operation_status_renderable(
                operation=operation,
                inputs=value_map,
                render_config={
                    "show_operation_name": True,
                    "show_inputs": True,
                    "show_outputs_schema": True,
                },
            ),
        )
        terminal_print(rg, in_panel=f"Operation info: [b]{operation.operation_id}[/b]")
        sys.exit(0)

    if value_map.check_invalid():
        terminal_print()
        rg = Group(
            "",
            "Can't run operation: invalid or insufficient input(s)",
            "",
            Rule(),
            "",
            create_operation_status_renderable(
                operation=operation,
                inputs=value_map,
                render_config={
                    "show_operation_name": True,
                    "show_inputs": True,
                    "show_outputs_schema": True,
                },
            ),
        )
        terminal_print(rg, in_panel=f"Run info: [b]{operation.operation_id}[/b]")
        sys.exit(1)

    if print_help:
        rich_format_operation_help(
            obj=click_context.command,
            ctx=click_context,
            operation=operation,
            op_inputs=value_map,
            cmd_help=cmd_help,
        )
        sys.exit(0)

    return value_map


def execute_job(
    api: KiaraAPI,
    operation: Operation,
    inputs: ValueMap,
    silent: bool,
    save_results: bool,
    aliases: Union[None, Mapping[str, List[str]]],
    properties: bool = False,
) -> uuid.UUID:
    """Execute the job."""
    job_id = api.queue_job(operation=operation, inputs=inputs)

    try:
        outputs = api.get_job_result(job_id=job_id)
    except FailedJobException as fje:
        terminal_print()
        error = fje.msg
        details: Union[str, None] = KiaraException.get_root_details(fje)
        if details:
            error = f"{error}\n\n{details}"
        _error = Markdown(error)
        terminal_print(_error, in_panel="Processing error")

        sys.exit(1)
    except Exception as e:
        terminal_print()
        terminal_print(e)
        sys.exit(1)

    if not silent:
        if len(outputs) > 1:
            title = "[b]Results[/b]"
        else:
            title = "[b]Result[/b]"

        # for field_name, value in outputs.items():
        #     results.append("")
        #     results.append(f"* [b i]{field_name}[/b i]")
        #     results.append(kiara_obj.data_registry.render_data(value.value_id))

        terminal_print(
            outputs, in_panel=title, empty_line_before=True, show_data_type=True
        )

    if properties:
        render_config = {
            "show_pedigree": False,
            "show_serialized": False,
            "show_data_preview": False,
            "show_properties": True,
            "show_destinies": False,
            "show_destiny_backlinks": False,
            "show_lineage": False,
            "show_environment_hashes": False,
            "show_environment_data": False,
        }

        title = "Result details"
        format = "terminal"

        from kiara.interfaces.python_api import ValueInfo

        v_infos = (
            ValueInfo.create_from_instance(kiara=api.context, instance=v)
            for v in outputs.values()
        )

        terminal_print_model(*v_infos, format=format, in_panel=title, **render_config)

    # for k, v in outputs.items():
    #     rendered = kiara_obj.data_registry.render_data(v)
    #     rich_print(rendered)

    if save_results:
        try:

            alias_map = create_save_config(
                field_names=outputs.field_names, aliases=aliases
            )

            saved_results = api.store_values(outputs, alias_map=alias_map)

            error = False
            for field, v in saved_results.root.items():
                if v.error:
                    error = True
                    terminal_print()
                    terminal_print(
                        f"[red]Error saving result for field '{field}'[/red]: {v.error}"
                    )
            if error:
                sys.exit(1)

            api.context.job_registry.store_job_record(job_id=job_id)

            if len(saved_results) == 1:
                title = "[b]Stored result value[/b]"
            else:
                title = "[b]Stored result values[/b]"
            terminal_print(saved_results, in_panel=title, empty_line_before=True)
        except Exception as e:
            log_exception(e)
            terminal_print(f"[red]Error saving results[/red]: {e}")
            sys.exit(1)

    return job_id