Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
kiara / models / module / jobs.py
Size: Mime:
# -*- coding: utf-8 -*-

#  Copyright (c) 2021, University of Luxembourg / DHARPA project
#  Copyright (c) 2021, Markus Binsteiner
#
#  Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)

import logging
import os
import uuid
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Mapping, Union

from pydantic import field_validator
from pydantic.fields import Field, PrivateAttr
from pydantic.main import BaseModel
from rich import box
from rich.console import RenderableType
from rich.table import Table

from kiara.exceptions import InvalidValuesException, KiaraException
from kiara.models import KiaraModel
from kiara.models.module.manifest import InputsManifest

if TYPE_CHECKING:
    from kiara.context import DataRegistry, Kiara
    from kiara.modules import KiaraModule


class ExecutionContext(KiaraModel):

    _kiara_model_id: ClassVar = "instance.execution_context"

    working_dir: str = Field(
        description="The path of the working directory.", default_factory=os.getcwd
    )
    pipeline_dir: Union[str, None] = Field(
        description="The path of the pipeline file that is being executed (if applicable).",
        default=None,
    )


class JobStatus(Enum):

    CREATED = "__job_created__"
    STARTED = "__job_started__"
    SUCCESS = "__job_success__"
    FAILED = "__job_failed__"


class LogMessage(BaseModel):

    timestamp: datetime = Field(
        description="The time the message was logged.", default_factory=datetime.now
    )
    log_level: int = Field(description="The log level.")
    msg: str = Field(description="The log message")


class JobLog(BaseModel):

    log: List[LogMessage] = Field(
        description="The logs for this job.", default_factory=list
    )
    percent_finished: int = Field(
        description="Describes how much of the job is finished. A negative number means the module does not support progress tracking.",
        default=-1,
    )

    def add_log(self, msg: str, log_level: int = logging.DEBUG):

        _msg = LogMessage(msg=msg, log_level=log_level)
        self.log.append(_msg)


class JobConfig(InputsManifest):

    _kiara_model_id: ClassVar = "instance.job_config"

    @classmethod
    def create_from_module(
        cls,
        data_registry: "DataRegistry",
        module: "KiaraModule",
        inputs: Mapping[str, Any],
    ) -> "JobConfig":

        augmented = module.augment_module_inputs(inputs=inputs)

        values = data_registry.create_valuemap(
            data=augmented, schema=module.full_inputs_schema
        )

        invalid = values.check_invalid()
        if invalid:
            raise InvalidValuesException(invalid_values=invalid)

        value_ids = values.get_all_value_ids()

        if not module.manifest.is_resolved:
            raise KiaraException(
                msg="Cannot create job config from unresolved manifest."
            )

        return JobConfig(
            module_type=module.manifest.module_type,
            module_config=module.manifest.module_config,
            is_resolved=module.manifest.is_resolved,
            inputs=value_ids,
        )

    def _retrieve_data_to_hash(self) -> Any:
        return {"manifest": self.manifest_cid, "inputs": self.inputs_cid}


class ActiveJob(KiaraModel):

    _kiara_model_id: ClassVar = "instance.active_job"

    job_id: uuid.UUID = Field(description="The job id.")

    job_config: JobConfig = Field(description="The job details.")
    status: JobStatus = Field(
        description="The current status of the job.", default=JobStatus.CREATED
    )
    job_log: JobLog = Field(description="The lob jog.")
    submitted: datetime = Field(
        description="When the job was submitted.", default_factory=datetime.now
    )
    started: Union[datetime, None] = Field(
        description="When the job was started.", default=None
    )
    finished: Union[datetime, None] = Field(
        description="When the job was finished.", default=None
    )
    results: Union[Dict[str, uuid.UUID], None] = Field(
        description="The result(s).", default=None
    )
    error: Union[str, None] = Field(
        description="Potential error message.", default=None
    )
    _exception: Union[Exception, None] = PrivateAttr(default=None)

    def is_finished(self) -> bool:
        return self.finished is not None

    def _retrieve_id(self) -> str:
        return str(self.job_id)

    def _retrieve_data_to_hash(self) -> Any:
        return self.job_id.bytes

    @property
    def exception(self) -> Union[Exception, None]:
        return self._exception

    @property
    def runtime(self) -> Union[float, None]:

        if self.started is None or self.finished is None:
            return None

        runtime = self.finished - self.started
        return runtime.total_seconds()


class JobRuntimeDetails(BaseModel):

    # @classmethod
    # def from_manifest(
    #     cls,
    #     manifest: Manifest,
    #     inputs: Mapping[str, Value],
    #     outputs: Mapping[str, Value],
    # ):
    #
    #     return JobRecord(
    #         module_type=manifest.module_type,
    #         module_config=manifest.module_config,
    #         inputs={k: v.value_id for k, v in inputs.items()},
    #         outputs={k: v.value_id for k, v in outputs.items()},
    #     )

    job_log: JobLog = Field(description="The lob jog.")
    submitted: datetime = Field(description="When the job was submitted.")
    started: datetime = Field(description="When the job was started.")
    finished: datetime = Field(description="When the job was finished.")
    runtime: float = Field(description="The duration of the job (in seconds).")

    def create_renderable(self, **config: Any) -> RenderableType:

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("Key", style="i")
        table.add_column("Value")

        table.add_row("submitted", str(self.submitted))
        table.add_row("started", str(self.started))
        table.add_row("finished", str(self.finished))
        table.add_row("runtime", f"{self.runtime} seconds")

        job_log_table = Table(show_header=False, box=box.SIMPLE)
        job_log_table.add_column("timestamp", style="i")
        job_log_table.add_column("message")
        for log in self.job_log.log:
            job_log_table.add_row(str(log.timestamp), log.msg)

        table.add_row("job log", job_log_table)

        return table


class JobRecord(JobConfig):

    _kiara_model_id: ClassVar = "instance.job_record"

    @classmethod
    def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):

        assert active_job.status == JobStatus.SUCCESS
        assert active_job.results is not None

        job_details = JobRuntimeDetails(
            job_log=active_job.job_log,
            submitted=active_job.submitted,
            started=active_job.started,  # type: ignore
            finished=active_job.finished,  # type: ignore
            runtime=active_job.runtime,  # type: ignore
        )

        inputs_data_cid = active_job.job_config.calculate_inputs_data_cid(
            data_registry=kiara.data_registry
        )

        job_record = JobRecord(
            job_id=active_job.job_id,
            module_type=active_job.job_config.module_type,
            module_config=active_job.job_config.module_config,
            is_resolved=active_job.job_config.is_resolved,
            inputs=active_job.job_config.inputs,
            outputs=active_job.results,
            runtime_details=job_details,
            environment_hashes=kiara.environment_registry.environment_hashes,
            inputs_data_hash=(
                str(inputs_data_cid) if inputs_data_cid is not None else None
            ),
        )
        return job_record

    job_id: uuid.UUID = Field(description="The globally unique id for this job.")
    environment_hashes: Mapping[str, Mapping[str, str]] = Field(
        description="Hashes for the environments this value was created in."
    )
    enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
        description="Information about the environments this value was created in.",
        default=None,
    )
    inputs_data_hash: Union[str, None] = Field(
        description="A map of the hashes of this jobs inputs."
    )

    outputs: Dict[str, uuid.UUID] = Field(description="References to the job outputs.")
    runtime_details: Union[JobRuntimeDetails, None] = Field(
        description="Runtime details for the job."
    )
    job_metadata: Mapping[str, Any] = Field(
        description="Optional metadata for this job.", default_factory=dict
    )

    _is_stored: bool = PrivateAttr(default=None)
    _outputs_hash: Union[int, None] = PrivateAttr(default=None)

    @field_validator("job_metadata", mode="before")
    @classmethod
    def validate_metadata(cls, value):

        if value is None:
            value = {}
        return value

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "manifest": self.manifest_cid,
            "inputs": self.inputs_cid,
            "outputs": {k: v.bytes for k, v in self.outputs.items()},
        }

    def create_renderable(self, **config: Any) -> RenderableType:

        from kiara.utils.output import extract_renderable

        include = config.get("include", None)

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("Key", style="i")
        table.add_column("Value")
        for k in self.model_fields.keys():
            if include is not None and k not in include:
                continue
            attr = getattr(self, k)
            v = extract_renderable(attr)
            table.add_row(k, v)
        table.add_row("job hash", self.job_hash)
        table.add_row("inputs hash", self.inputs_hash)
        return table

    # @property
    # def outputs_hash(self) -> int:
    #
    #     if self._outputs_hash is not None:
    #         return self._outputs_hash
    #
    #     obj = self.outputs
    #     h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION)
    #     self._outputs_hash = h[obj]
    #     return self._outputs_hash