Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# -*- coding: utf-8 -*-
import ast
import inspect
import os
import typing
from ast import Attribute, Call, Constant, Expr, Import, ImportFrom
from pathlib import Path

import streamlit as st
from kiara.defaults import DEFAULT_NO_DESC_VALUE
from kiara.metadata.core_models import DocumentationMetadataModel
from kiara.utils import camel_case_to_snake_case
from pydantic import validate_arguments
from pydantic.decorator import ValidatedFunction
from pydantic.main import BaseModel
from streamlit.delta_generator import DeltaGenerator

from kiara_streamlit.components import KiaraComponentMixin
from kiara_streamlit.components.file import KiaraFileComponentsMixin
from kiara_streamlit.components.module import KiaraModuleComponentsMixin
from kiara_streamlit.components.onboarding import KiaraOnboardingComponentsMixin
from kiara_streamlit.components.operations import KiaraOperationComponentsMixin
from kiara_streamlit.components.pipeline import KiaraPipelineComponentsMixin
from kiara_streamlit.components.processing import KiaraProcessingComponentsMixin
from kiara_streamlit.components.tabular_data import TableComponentsMixin
from kiara_streamlit.components.value_info import KiaraValueInfoComponentsMixin
from kiara_streamlit.components.value_input import KiaraInputComponentsMixin
from kiara_streamlit.defaults import EXAMPLE_BASE_DIR, TEMPLATES_BASE_DIR
from kiara_streamlit.utils import render_example_template


def _extract_component_class_name(cls: typing.Type[KiaraComponentMixin]):

    if hasattr(cls, "_type_name"):
        return cls._type_name  # type: ignore

    result = camel_case_to_snake_case(cls.__name__)
    if result.endswith("_mixin"):
        result = result[0:-6]
    if result.endswith("_components"):
        result = result[0:-11]

    if result.startswith("kiara_"):
        result = result[6:]
    return result


class EmbeddExampleTransformer(ast.NodeTransformer):
    def __init__(self):

        self._lines_to_remove: typing.Set[int] = set()

    @property
    def lines_to_remove(self) -> typing.Set[int]:
        return self._lines_to_remove

    def visit_ImportFrom(self, node: ImportFrom) -> typing.Any:

        if (
            node.module == "kiara_streamlit"
            and len(node.names) == 1
            and node.names[0].name == "KiaraStreamlit"
        ):
            self._lines_to_remove.add(node.lineno)
        return None

    def visit_Import(self, node: Import) -> typing.Any:
        if (
            len(node.names) == 1
            and node.names[0].name == "streamlit"
            and node.names[0].asname == "st"
        ):
            self._lines_to_remove.add(node.lineno)
        elif len(node.names) == 1 and node.names[0].name == "kiara_streamlit":
            self._lines_to_remove.add(node.lineno)
        return node

    def visit_Expr(self, node: Expr) -> typing.Any:

        if isinstance(node.value, Constant) and isinstance(node.value.value, str):
            lno = node.lineno
            elno = node.end_lineno
            assert lno is not None
            assert elno is not None
            for i in range(lno, elno + 1):
                self._lines_to_remove.add(i)
        elif isinstance(node.value, Call):
            if isinstance(node.value.func, Attribute) and node.value.func.attr == "init" and node.value.func.value.id == "kiara_streamlit":  # type: ignore
                self._lines_to_remove.add(node.lineno)
        return node


class ExampleCode(object):
    @classmethod
    def find_example_files(
        cls, examples_base_dir: str
    ) -> typing.Dict[str, typing.Dict[str, str]]:

        if not os.path.isdir(examples_base_dir):
            return {}

        result: typing.Dict[str, typing.Dict[str, str]] = {}
        for root, dirnames, filenames in os.walk(examples_base_dir, topdown=True):

            for f in filenames:
                full = os.path.join(root, f)
                if os.path.isfile(full) and f.endswith(".py"):
                    rel_path = os.path.relpath(os.path.dirname(full), examples_base_dir)
                    result.setdefault(rel_path.replace(os.path.sep, "."), {})[
                        f[0:-3]
                    ] = full

        return result

    @classmethod
    def find_examples(
        cls, examples_base_dir: str
    ) -> typing.Dict[str, typing.Dict[str, "ExampleCode"]]:

        files = cls.find_example_files(examples_base_dir=examples_base_dir)

        result: typing.Dict[str, typing.Dict[str, ExampleCode]] = {}
        for category, examples in files.items():
            for example_name, example_file in examples.items():
                ex_code = cls.load(
                    example_file=example_file,
                    category_name=category,
                    example_name=example_name,
                )
                result.setdefault(category, {})[example_name] = ex_code

        return result

    @classmethod
    def load(
        cls,
        example_file: str,
        category_name: typing.Optional[str] = None,
        example_name: typing.Optional[str] = None,
    ) -> "ExampleCode":

        with open(example_file, "r") as source:
            source_code = source.read()

        return ExampleCode(
            category_name=category_name,
            example_name=example_name,
            full_source_code=source_code,
            path=example_file,
        )

    def __init__(
        self,
        full_source_code: str,
        category_name: typing.Optional[str] = None,
        example_name: typing.Optional[str] = None,
        path: typing.Optional[str] = None,
    ):

        self._full_source_code: str = full_source_code

        self._category_name: typing.Optional[str] = category_name
        self._example_name: typing.Optional[str] = example_name

        self._path: typing.Optional[str] = path

        self._minimal_source_code: typing.Optional[str] = None
        self._doc: typing.Optional[DocumentationMetadataModel] = None

    @property
    def minimal_source_code(self) -> str:

        if self._minimal_source_code is None:
            self._parse_source_code()
        return self._minimal_source_code  # type: ignore

    @property
    def doc(self) -> DocumentationMetadataModel:

        if self._doc is None:
            self._parse_source_code()
        return self._doc  # type: ignore

    def _parse_source_code(self) -> None:

        tree = ast.parse(self._full_source_code)

        doc = ast.get_docstring(tree)
        if not doc:
            doc = DEFAULT_NO_DESC_VALUE

        self._doc = DocumentationMetadataModel.from_string(doc)

        transformer = EmbeddExampleTransformer()
        transformer.visit(tree)

        new_code_lines = []

        first_line = False
        last_line_empty = False
        for lineno, line in enumerate(self._full_source_code.split("\n"), start=1):

            if lineno in transformer.lines_to_remove:
                continue

            if (not line or line.startswith("# ")) and not first_line:
                continue

            first_line = True
            line_empty = True if not line.strip() else False
            if line_empty and last_line_empty:
                continue

            new_code_lines.append(line)
            last_line_empty = line_empty

        _new_lines = "\n".join(new_code_lines)
        self._minimal_source_code = _new_lines


class AllComponentsMixin(
    KiaraFileComponentsMixin,
    KiaraOperationComponentsMixin,
    KiaraOnboardingComponentsMixin,
    KiaraProcessingComponentsMixin,
    KiaraPipelineComponentsMixin,
    KiaraModuleComponentsMixin,
    TableComponentsMixin,
    KiaraValueInfoComponentsMixin,
    KiaraInputComponentsMixin,
):
    pass


class ComponentMgmt(object):
    def __init__(self, example_base_dir: typing.Optional[str] = None):

        if example_base_dir is None:
            example_base_dir = EXAMPLE_BASE_DIR

        self._example_base_dir: str = example_base_dir

        self._components_cls: typing.Type[AllComponentsMixin] = AllComponentsMixin

        # self._all_mixin_classes: typing.Dict[str, typing.Type[KiaraComponentMixin]] = {
        #     _extract_component_class_name(cls): cls
        #     for cls in _get_all_subclasses(KiaraComponentMixin, ignore_abstract=True)
        # }

        subclasses: typing.Iterable[
            typing.Type[typing.Any]
        ] = self._components_cls.__bases__

        self._all_mixin_classes: typing.Dict[str, typing.Type[KiaraComponentMixin]] = {
            _extract_component_class_name(cls): cls
            for cls in subclasses
            if issubclass(cls, KiaraComponentMixin)
        }

        self._components_by_collection: typing.Optional[
            typing.Dict[str, typing.Dict[str, BaseModel]]
        ] = None
        self._components_index: typing.Optional[
            typing.Dict[str, typing.Tuple[str, str]]
        ] = None
        self._docs: typing.Dict[str, typing.Dict[str, DocumentationMetadataModel]] = {}
        self._examples: typing.Optional[
            typing.Dict[str, typing.Dict[str, ExampleCode]]
        ] = None

    @property
    def component_collections(self) -> typing.Iterable[str]:
        return sorted(self._all_mixin_classes.keys())

    @property
    def all_components_by_collection(
        self,
    ) -> typing.Mapping[str, typing.Mapping[str, BaseModel]]:

        if self._components_by_collection is not None:
            return self._components_by_collection

        funcs: typing.Dict[str, typing.Dict[str, BaseModel]] = {}
        index: typing.Dict[str, typing.Tuple[str, str]] = {}
        for component_collection in sorted(self._all_mixin_classes.keys()):

            cls = self._all_mixin_classes[component_collection]

            functions = self._get_mixin_functions(cls)
            for alias in functions.keys():
                if alias in index.keys():
                    raise Exception(f"Duplicate component name: {alias}.")
                index[alias] = (component_collection, alias)

            funcs[component_collection] = functions

        self._components_by_collection = funcs
        self._components_index = index

        return self._components_by_collection

    @property
    def components_index(self) -> typing.Mapping[str, typing.Tuple[str, str]]:

        if self._components_index is None:
            self.all_components_by_collection  # type: ignore
        return self._components_index  # type: ignore

    @property
    def component_names(self) -> typing.Iterable[str]:

        return self.components_index.keys()

    @property
    def examples(self) -> typing.Mapping[str, typing.Mapping[str, ExampleCode]]:

        if self._examples is None:
            self._examples = ExampleCode.find_examples(
                examples_base_dir=self._example_base_dir
            )
        return self._examples

    def get_components_of_collection(
        self, component_collection: str
    ) -> typing.Mapping[str, BaseModel]:

        return self.all_components_by_collection[component_collection]

    def get_examples_for_category(
        self, example_category: str
    ) -> typing.Mapping[str, ExampleCode]:

        return self.examples.get(example_category, {})

    def get_example(
        self, example_category: str, example_name: str
    ) -> typing.Optional[ExampleCode]:

        examples = self.get_examples_for_category(example_category=example_category)

        example = examples.get(example_name, None)
        return example

    @validate_arguments
    def _get_mixin_functions(
        self, cls: typing.Type[KiaraComponentMixin]
    ) -> typing.Dict[str, BaseModel]:
        class Config:
            arbitrary_types_allowed = True

        result: typing.Dict[str, BaseModel] = {}
        for attr_name in dir(cls):

            if attr_name.startswith("_"):
                continue
            attr = getattr(cls, attr_name)
            if not callable(attr):
                continue
            model = self._analyse_component_attr(attr, config=Config)
            if not model:
                continue

            result[attr_name] = model
        return result

    def _analyse_component_attr(
        self, func: typing.Callable, config: typing.Type["Config"]  # type: ignore  # noqa
    ) -> typing.Optional["DecoratorBaseModel"]:  # type: ignore  # noqa

        vd = ValidatedFunction(func, config=config)
        fields = vd.model.__fields__
        if "container" not in fields.keys():
            return None

        return vd.model

    def get_component_from_collection(
        self, component_collection: str, component_name: str
    ) -> typing.Callable:

        cat = self._all_mixin_classes.get(component_collection, None)
        if not cat:
            raise Exception(
                f"No component collection '{component_collection}' available."
            )

        if not hasattr(cat, component_name):
            raise Exception(
                f"No component '{component_name}' in collection '{component_collection}' available."
            )

        return getattr(cat, component_name)

    def get_component(self, component_name: str) -> typing.Callable:

        _temp = self.components_index.get(component_name, None)
        if not _temp:
            raise Exception(f"No component with name '{component_name}' available.")

        func = self.get_component_from_collection(
            component_collection=_temp[0], component_name=_temp[1]
        )
        return func

    def _get_component_model(
        self, component_collection: str, func_name: str
    ) -> BaseModel:

        cat = self._all_mixin_classes.get(component_collection, None)
        if not cat:
            raise Exception(
                f"No component category '{component_collection}' available."
            )

        if not hasattr(cat, func_name):
            raise Exception(
                f"No component '{func_name}' in collection '{component_collection}' available."
            )

        return self.all_components_by_collection[component_collection][func_name]

    def get_doc_for_component(
        self, component_collection: str, component_name: str
    ) -> DocumentationMetadataModel:

        if not self._docs.get(component_collection, {}).get(component_name, None):

            func = self.get_component_from_collection(
                component_collection=component_collection, component_name=component_name
            )
            doc = func.__doc__
            if not doc:
                doc = DEFAULT_NO_DESC_VALUE
            doc = inspect.cleandoc(doc)  # type: ignore

            _doc = DocumentationMetadataModel.from_string(doc)
            self._docs.setdefault(component_collection, {})[component_name] = _doc

        return self._docs[component_collection][component_name]

    def is_input_component(self, component_collection: str, component_name: str):

        model = self._get_component_model(
            component_collection=component_collection, func_name=component_name
        )
        return "key" in model.__fields__.keys()

    def render_component_description(
        self,
        component_collection: str,
        component_name: str,
        full_doc: bool = True,
        container: DeltaGenerator = st,
    ):

        component = self.get_doc_for_component(
            component_collection=component_collection, component_name=component_name
        )
        if full_doc:
            doc = component.full_doc
        else:
            doc = component.description
        container.write(doc)

    def render_component_args_doc(
        self,
        component_collection: str,
        component_name: str,
        container: DeltaGenerator = st,
    ):

        model = self._get_component_model(
            component_collection=component_collection, func_name=component_name
        )

        md = "| field | type | required | default | desc |"
        md = f"{md}\n| --- | --- | --- | --- | --- |"

        fields_required = {}
        fields_optional = {}
        for field_name in sorted(model.__fields__.keys()):

            if field_name in [
                "v__duplicate_kwargs",
                "self",
                "args",
                "kwargs",
                "container",
            ]:
                continue
            details = model.__fields__[field_name]
            if details.required:
                if details.type_ == typing.Any:
                    fields_optional[field_name] = details
                else:
                    fields_required[field_name] = details
            else:
                fields_optional[field_name] = details

        fields_required.update(fields_optional)
        for field_name, details in fields_required.items():

            _type = extract_type_name(details.type_)

            desc = details.field_info.description

            if not desc:
                desc = DEFAULT_NO_DESC_VALUE

            if field_name == "default" and desc == DEFAULT_NO_DESC_VALUE:
                desc = "A default value to use in the input component (if applicable)."
            elif field_name == "label" and desc == DEFAULT_NO_DESC_VALUE:
                desc = "A label for the component. Will be forwarded to the underlying streamlit component(s)."
            elif field_name == "key" and desc == DEFAULT_NO_DESC_VALUE:
                desc = "Will be forwarded to underlying streamlit component(s): an optional string to use as the unique key for the widget. If this is omitted, a key will be generated for the widget based on its content. Multiple widgets of the same type may not share the same key."

            if details.required:
                if details.type_ == typing.Any:
                    req = "no"
                else:
                    req = "yes"
            else:
                req = "no"

            default = details.default
            if default is None:
                default = ""
            md = f"{md}\n| {field_name} | {_type} | {req} | {default} | {desc} |"

        container.markdown(md)

    def render_component_doc(
        self,
        component_collection: str,
        component_name: str,
        incl_component_code: bool = True,
        container: DeltaGenerator = st,
    ):

        example_category = f"component_examples.{component_collection}"

        container.markdown(f"### ***{component_name}***")
        self.render_component_description(
            component_collection=component_collection, component_name=component_name
        )
        arg_expander = container.expander("Supported arguments", expanded=False)
        self.render_component_args_doc(
            component_collection=component_collection,
            component_name=component_name,
            container=arg_expander,
        )
        if incl_component_code:
            component_func = self.get_component_from_collection(
                component_collection=component_collection, component_name=component_name
            )
            code_expander = container.expander("Component code", expanded=False)
            code_expander.markdown(
                f"``` python\n{inspect.getsource(component_func)}\n```"
            )
        container.markdown("##### Examples")
        examples = self.get_examples_for_category(
            example_category=f"{example_category}.{component_name}"
        )
        if not examples:
            container.markdown("###### How to use")
            model = self._get_component_model(
                component_collection=component_collection, func_name=component_name
            )
            if "key" in model.__fields__.keys():
                container.markdown(
                    f"``` python\nresult = st.kiara.{component_name}(...)\nst.write(result)\n```"
                )
            else:
                container.markdown(f"``` python\nst.kiara.{component_name}(...)\n```")
            return
        for example_name, example in examples.items():
            container.markdown(f"###### {example.doc.description}")
            if example.doc.doc:
                container.markdown(example.doc.doc)
            container.markdown(f"``` python\n{example.minimal_source_code}\n```")
            try:
                exec(example.minimal_source_code, {"st": st})
            except Exception as e:
                container.error(
                    f"Can't render example component, error in example code: {e}"
                )
            container.markdown("---")

    def create_component_example_file_from_template(
        self,
        component_collection: str,
        component_name: str,
        example_name: typing.Optional[str] = None,
        example_doc: typing.Optional[str] = None,
        example_args: typing.Optional[str] = None,
        template: typing.Union[None, str, Path] = None,
    ):

        if example_name is None:
            example_name = f"{component_name}_1"

        example_name = f"{example_name}.py"

        example_file_path = os.path.join(
            self._example_base_dir,
            "component_examples",
            component_collection,
            component_name,
            example_name,
        )
        if os.path.exists(example_file_path):
            raise Exception(
                f"Can't render example from template, path exists: {example_file_path}"
            )

        os.makedirs(os.path.dirname(example_file_path), exist_ok=True)

        if not template:
            if self.is_input_component(
                component_collection=component_collection, component_name=component_name
            ):
                template = os.path.join(
                    TEMPLATES_BASE_DIR, "component_example_input.py.j2"
                )
            else:
                template = os.path.join(TEMPLATES_BASE_DIR, "component_example.py.j2")
        content = render_example_template(
            component_name=component_name,
            example_doc=example_doc,
            example_args=example_args,
            template=template,
        )

        with open(example_file_path, "wt") as f:
            f.write(content)

        return example_file_path

    def create_missing_example_files(
        self, *component_collections: str
    ) -> typing.Dict[str, typing.Dict[str, typing.Dict[str, str]]]:

        if not component_collections:
            _component_collections: typing.Iterable[str] = self.component_collections
        else:
            _component_collections = component_collections

        result: typing.Dict[str, typing.Dict[str, typing.Dict[str, str]]] = {}

        for component_collection in _component_collections:
            for component_name in self.get_components_of_collection(
                component_collection
            ).keys():
                examples_dir_path = os.path.join(
                    self._example_base_dir,
                    "component_examples",
                    component_collection,
                    component_name,
                )
                os.makedirs(examples_dir_path, exist_ok=True)
                if os.listdir(examples_dir_path):
                    print(
                        f"Not adding example for component '{component_name}': examples dir exists and not empty."
                    )
                    continue

                example_name = f"{component_name}_1"
                example_file = self.create_component_example_file_from_template(
                    component_collection=component_collection,
                    component_name=component_name,
                    example_name=example_name,
                )
                result.setdefault(component_collection, {}).setdefault(
                    component_name, {}
                )[example_name] = example_file

        return result


def extract_type_name(item: typing.Any) -> str:

    if isinstance(item, type):
        item = item.__name__
    elif item == typing.Any:
        item = "any"
    elif isinstance(item, typing._UnionGenericAlias):  # type: ignore
        result = []
        for arg in item.__args__:
            result.append(extract_type_name(arg))
        item = " OR ".join(result)
    else:
        item = str(item)

    item = item.replace("typing.Mapping", "dict")
    item = item.replace("typing.Iterable", "list")
    item = item.replace("typing.Union", "union")
    item = item.replace("kiara.data.values.Value", "Value")
    item = item.replace("NoneType", "None")

    return item