Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
kiara / interfaces / cli / run.py
Size: Mime:
# -*- coding: utf-8 -*-

#  Copyright (c) 2021, University of Luxembourg / DHARPA project
#  Copyright (c) 2021, Markus Binsteiner
#
#  Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)

"""The 'run' subcommand for the cli."""

import os.path
import sys
import typing
from pathlib import Path
from typing import Any, Dict, Iterable, List, Mapping, Union

import rich_click as click

from kiara.exceptions import InvalidCommandLineInvocation
from kiara.utils import log_message
from kiara.utils.cli import dict_from_cli_args, terminal_print
from kiara.utils.cli.exceptions import handle_exception
from kiara.utils.files import get_data_from_file

if typing.TYPE_CHECKING:
    from kiara.api import KiaraAPI


@click.command()
@click.argument("module_or_operation", nargs=1, metavar="MODULE_OR_OPERATION")
@click.argument("inputs", nargs=-1, required=False)
@click.option(
    "--module-config",
    "-c",
    required=False,
    help="(Optional) module configuration, only valid when run target is a module name.",
    multiple=True,
)
@click.option(
    "--explain",
    "-e",
    help="Display information about the selected operation and exit.",
    is_flag=True,
)
@click.option(
    "--output", "-o", help="The output format and configuration.", multiple=True
)
@click.option(
    "--save",
    "-s",
    help="Save one or several of the outputs of this run. If the argument contains a '=', the format is [output_name]=[alias], if not, the values will be saved as '[alias]-[output_name]'.",
    required=False,
    multiple=True,
)
@click.option(
    "--print-properties",
    "-p",
    help="Also display the properties of the result values.",
    required=False,
    is_flag=True,
)
@click.option("--help", "-h", help="Show this message and exit.", is_flag=True)
@click.pass_context
@handle_exception()
def run(
    ctx,
    module_or_operation: str,
    module_config: Iterable[str],
    inputs: Iterable[str],
    output: Iterable[str],
    explain: bool,
    save: Iterable[str],
    print_properties: bool,
    help: bool,
):
    """Run a kiara operation."""
    from kiara.api import JobDesc, RunSpec
    from kiara.utils.cli.run import (
        _validate_save_option,
        calculate_aliases,
        execute_job,
        set_and_validate_inputs,
        validate_operation_in_terminal,
    )
    from kiara.utils.output import OutputDetails

    # =========================================================================
    # initialize a few variables

    if module_config:
        module_config = dict_from_cli_args(*module_config)
    else:
        module_config = {}

    _validate_save_option(save)

    output_details = OutputDetails.from_data(output)
    silent = False
    if output_details.format == "silent":
        silent = True

    force_overwrite = output_details.config.get("force", False)

    if output_details.target != "terminal":
        if output_details.target == "file":
            target_dir = Path.cwd()
            target_file = target_dir / f"{module_or_operation}.{output_details.format}"
        else:
            target_file = Path(
                os.path.realpath(os.path.expanduser(output_details.target))
            )

        if target_file.exists() and not force_overwrite:
            terminal_print()
            terminal_print(
                f"Can't run workflow, the target files already exist, and '--output force=true' not specified: {target_file}"
            )

            sys.exit(1)

    api: KiaraAPI = ctx.obj.kiara_api  # type: ignore

    cmd_arg = ctx.params["module_or_operation"]
    cmd_help = f"[yellow bold]Usage: [/yellow bold][bold]kiara run [OPTIONS] [i]{cmd_arg}[/i] [INPUTS][/bold]"

    # if module_config:
    #     op: Union[str, Mapping[str, Any]] = {
    #         "module_type": module_or_operation,
    #         "module_config": module_config,
    #     }
    # else:
    #     op = module_or_operation

    # base_inputs: Union[Mapping[str, Any], None] = None
    # extra_save: Union[None, Dict[str, str]] = None
    run_type = None
    job_descs: List[JobDesc] = []

    if not module_config and os.path.isfile(module_or_operation):

        path: Path = Path(module_or_operation)
        data = get_data_from_file(path)
        repl_dict: Dict[str, Any] = {"this_dir": path.parent.absolute().as_posix()}
        alias = path.stem

        if isinstance(data, list):
            raise NotImplementedError()
        elif isinstance(data, Mapping):
            if "operation" in data.keys():
                run_type = "job"
                job_desc = JobDesc.create_from_data(
                    data, var_repl_dict=repl_dict, alias=alias
                )
                job_descs.append(job_desc)

            elif "jobs" in data.keys():
                run_type = "run"

                if inputs:
                    terminal_print()
                    terminal_print(
                        "Can't specify inputs when running file with a run spec."
                    )
                    sys.exit(1)

                run_desc = RunSpec.create_from_data(
                    data, var_repl_dict=repl_dict, alias=alias
                )
                job_descs.extend(run_desc.jobs)
            elif "steps" not in data.keys():

                terminal_print()
                terminal_print(
                    f"Can't run file '{path}', it does not contain a valid pipeline, job or run specification."
                )
                sys.exit(1)
            else:
                # TODO: check if valid pipeline, otherwise check if 'module_or_operation is an operation name

                from kiara.models.module.jobs import ExecutionContext
                from kiara.models.module.pipeline import PipelineConfig

                pipeline_dir = os.path.abspath(os.path.dirname(path))
                execution_context = ExecutionContext(pipeline_dir=pipeline_dir)
                pc = PipelineConfig.from_config(
                    data, execution_context=execution_context
                )
                job_desc = JobDesc(
                    operation="pipeline",
                    module_config=pc.model_dump(),
                    job_alias="local_pipeline",
                )
                job_descs.append(job_desc)

    else:
        if module_config:
            job_desc = JobDesc(
                operation=module_or_operation,
                module_config=module_config,
                job_alias="default",
            )
        else:
            job_desc = JobDesc(operation=module_or_operation, job_alias="default")

        job_descs.append(job_desc)

    assert len(job_descs) > 0

    for job_desc in job_descs:

        if job_desc.module_config:
            op: Union[str, Mapping[str, Any]] = {
                "module_type": job_desc.operation,
                "module_config": job_desc.module_config,
            }
        else:
            op = job_desc.operation

        try:
            kiara_op = validate_operation_in_terminal(api=api, module_or_operation=op)
        except InvalidCommandLineInvocation as e:
            ctx.obj.exit(msg=None, exit_code=e.error_code)
            return

        log_message(f"run_arg.is.{run_type}")

        final_aliases = calculate_aliases(
            operation=kiara_op, alias_tokens=save, extra_aliases=job_desc.save
        )

        try:
            inputs_value_map = set_and_validate_inputs(
                api=api,
                operation=kiara_op,
                inputs=inputs,
                explain=explain,
                print_help=help,
                click_context=ctx,
                cmd_help=cmd_help,
                base_inputs=job_desc.inputs,
            )
            if inputs_value_map is None:
                ctx.obj.exit(msg=None, exit_code=0)
                return
        except InvalidCommandLineInvocation as e:
            ctx.obj.exit(msg=None, exit_code=e.error_code)
            return

        execute_job(
            api=api,
            operation=kiara_op,
            inputs=inputs_value_map,
            silent=silent,
            save_results=bool(final_aliases),
            aliases=final_aliases,
            properties=print_properties,
        )