Repository URL to install this package:
|
Version:
0.5.10.dev4+gc28d68fe2 ▾
|
# -*- coding: utf-8 -*-
# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)
import inspect
import json
import os.path
import sys
import textwrap
import uuid
from functools import cached_property
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Iterable,
List,
Mapping,
MutableMapping,
Type,
Union,
)
import dpath
import structlog
from ruamel.yaml import YAML
from kiara.defaults import (
OFFICIAL_KIARA_PLUGINS,
VALID_VALUE_QUERY_CATEGORIES,
VALUE_ATTR_DELIMITER,
)
from kiara.exceptions import (
DataTypeUnknownException,
KiaraException,
NoSuchExecutionTargetException,
NoSuchWorkflowException,
)
from kiara.interfaces.python_api.models.info import (
DataTypeClassesInfo,
DataTypeClassInfo,
KiaraPluginInfo,
KiaraPluginInfos,
ModuleTypeInfo,
ModuleTypesInfo,
OperationGroupInfo,
OperationInfo,
OperationTypeInfo,
RendererInfos,
ValueInfo,
ValuesInfo,
)
from kiara.interfaces.python_api.models.job import JobDesc
from kiara.interfaces.python_api.value import StoreValueResult, StoreValuesResult
from kiara.models.context import ContextInfo, ContextInfos
from kiara.models.module.jobs import ActiveJob
from kiara.models.module.manifest import Manifest
from kiara.models.module.operation import Operation
from kiara.models.rendering import RenderValueResult
from kiara.models.runtime_environment.python import PythonRuntimeEnvironment
from kiara.models.values.matchers import ValueMatcher
from kiara.models.values.value import (
PersistedData,
Value,
ValueMapReadOnly,
ValueSchema,
)
from kiara.models.workflow import WorkflowGroupInfo, WorkflowInfo, WorkflowMetadata
from kiara.operations import OperationType
from kiara.operations.included_core_operations.filter import FilterOperationType
from kiara.operations.included_core_operations.pipeline import PipelineOperationDetails
from kiara.operations.included_core_operations.pretty_print import (
PrettyPrintOperationType,
)
from kiara.operations.included_core_operations.render_value import (
RenderValueOperationType,
)
from kiara.registries.environment import EnvironmentRegistry
from kiara.registries.ids import ID_REGISTRY
from kiara.registries.operations import OP_TYPE
from kiara.renderers import KiaraRenderer
from kiara.utils import log_exception, log_message
from kiara.utils.downloads import get_data_from_url
from kiara.utils.files import get_data_from_file
from kiara.utils.operations import create_operation
from kiara.utils.string_vars import replace_var_names_in_obj
if TYPE_CHECKING:
from kiara.context import Kiara, KiaraConfig, KiaraRuntimeConfig
from kiara.interfaces.python_api.models.doc import (
OperationsMap,
PipelinesMap,
WorkflowsMap,
)
from kiara.interfaces.python_api.workflow import Workflow
from kiara.models.module.pipeline import PipelineConfig, PipelineStructure
from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo
logger = structlog.getLogger()
yaml = YAML(typ="safe")
class KiaraAPI(object):
"""
Public API for clients.
This class wraps a [Kiara][kiara.context.kiara.Kiara] instance, and allows easy a access to tasks that are
typically done by a frontend. The return types of each method are json seriable in most cases.
Can be extended for special scenarios and augmented with scenario-specific methdos (Jupyter, web-frontend, ...)
The naming of the API endpoints follows a (loose-ish) convention:
- list_*: return a list of ids or items, if items, filtering is supported
- get_*: get specific instances of a type (operation, value, etc.)
- retrieve_*: get augmented information about an instance or type of something. This usually implies that there is some overhead,
so before you use this, make sure that there is not 'get_*' or 'list_*' endpoint that could give you what you need.
.
"""
_default_instance: ClassVar[Union["KiaraAPI", None]] = None
_context_instances: ClassVar[Dict[str, "KiaraAPI"]] = {}
@classmethod
def instance(cls, context_name: Union[str, None] = None) -> "KiaraAPI":
if context_name is None:
if cls._default_instance is not None:
return cls._default_instance
from kiara.context import KiaraConfig
config = KiaraConfig()
api = KiaraAPI(kiara_config=config)
cls._default_instance = api
return api
else:
raise NotImplementedError()
def __init__(self, kiara_config: Union["KiaraConfig", None] = None):
if kiara_config is None:
from kiara.context import Kiara, KiaraConfig
kiara_config = KiaraConfig()
self._kiara_config: KiaraConfig = kiara_config
self._contexts: Dict[str, Kiara] = {}
self._workflow_cache: Dict[uuid.UUID, Workflow] = {}
self._current_context: Union[None, Kiara] = None
self._current_context_alias: Union[None, str] = None
@cached_property
def doc(self) -> Dict[str, str]:
"""Get the documentation for this API."""
result = {}
for method_name in dir(self):
if method_name.startswith("_"):
continue
method = getattr(self.__class__, method_name)
doc = inspect.getdoc(method)
if doc is None:
doc = "-- n/a --"
else:
doc = textwrap.dedent(doc)
result[method_name] = doc
return result
def list_available_plugin_names(
self, regex: str = "^kiara[-_]plugin\\..*"
) -> List[str]:
"""
Get a list of all available plugins.
Arguments:
regex: an optional regex to indicate the plugin naming scheme (default: /$kiara[_-]plugin\..*/)
Returns:
a list of plugin names
"""
if not regex:
regex = "^kiara[-_]plugin\\..*"
return KiaraPluginInfos.get_available_plugin_names(
kiara=self.context, regex=regex
)
def retrieve_plugin_info(self, plugin_name: str) -> KiaraPluginInfo:
"""
Get information about a plugin.
Arguments:
plugin_name: the name of the plugin
Returns:
a dictionary with information about the plugin
"""
info = KiaraPluginInfo.create_from_instance(
kiara=self.context, instance=plugin_name
)
return info
def retrieve_plugin_infos(self, plugin_name_regex: str = "^kiara[-_]plugin\\..*"):
if not plugin_name_regex:
plugin_name_regex = "^kiara[-_]plugin\\..*"
plugin_infos = KiaraPluginInfos.create_group(
self.context, None, plugin_name_regex
)
return plugin_infos
@property
def context(self) -> "Kiara":
"""
Return the kiara context.
DON"T USE THIS! This is going away in the production release.
"""
if self._current_context is None:
self._current_context = self._kiara_config.create_context(
extra_pipelines=None
)
self._current_context_alias = self._kiara_config.default_context
return self._current_context
def get_runtime_config(self) -> "KiaraRuntimeConfig":
"""Retrieve the current runtime configuration."""
return self.context.runtime_config
def get_context_info(self) -> ContextInfo:
"""Retrieve information about the current kiara context."""
context_config = self._kiara_config.get_context_config(
self.get_current_context_name()
)
info = ContextInfo.create_from_context_config(
context_config,
context_name=self.get_current_context_name(),
runtime_config=self._kiara_config.runtime_config,
)
return info
def ensure_plugin_packages(
self, package_names: Union[str, Iterable[str]], update: bool = False
) -> Union[bool, None]:
"""
Ensure that the specified packages are installed.
This functionality is provisional, don't rely on it being available long-term. Ideally, we'll have other, external ways to manage the environment.
Arguments:
package_names: The names of the packages to install.
update: If True, update the packages if they are already installed
Returns:
'None' if run in jupyter, 'True' if any packages were installed, 'False' otherwise.
"""
if isinstance(package_names, str):
package_names = [package_names]
env_reg = EnvironmentRegistry.instance()
python_env: PythonRuntimeEnvironment = env_reg.environments[ # type: ignore
"python"
] # type: ignore
if not package_names:
package_names = OFFICIAL_KIARA_PLUGINS # type: ignore
if not update:
plugin_packages: List[str] = []
pkgs = [p.name.replace("_", "-") for p in python_env.packages]
for package_name in package_names:
if package_name.startswith("git:"):
package_name = package_name.replace("git:", "")
git = True
else:
git = False
package_name = package_name.replace("_", "-")
if not package_name.startswith("kiara-plugin."):
package_name = f"kiara-plugin.{package_name}"
if git or package_name.replace("_", "-") not in pkgs:
if git:
package_name = package_name.replace("-", "_")
plugin_packages.append(
f"git+https://x:x@github.com/DHARPA-project/{package_name}@develop"
)
else:
plugin_packages.append(package_name)
else:
plugin_packages = package_names # type: ignore
in_jupyter = "google.colab" in sys.modules or "jupyter_client" in sys.modules
if not plugin_packages:
if in_jupyter:
return None
else:
# nothing to do
return False
class DummyContext(object):
def __getattribute__(self, item):
raise Exception(
"Currently installing plugins, no other operations are allowed."
)
current_context_name = self._current_context_alias
for k in self._contexts.keys():
self._contexts[k] = DummyContext() # type: ignore
self._current_context = DummyContext() # type: ignore
cmd = ["-q", "--isolated", "install"]
if update:
cmd.append("--upgrade")
cmd.extend(plugin_packages)
if in_jupyter:
from IPython import get_ipython
ipython = get_ipython()
cmd_str = f"sc -l stdout = {sys.executable} -m pip {' '.join(cmd)}"
ipython.magic(cmd_str)
exit_code = 100
else:
import pip._internal.cli.main as pip
log_message(
"install.python_packages", packages=plugin_packages, update=update
)
exit_code = pip.main(cmd)
self._contexts.clear()
self._current_context = None
self._current_context_alias = None
EnvironmentRegistry._instance = None
if current_context_name:
self.set_active_context(context_name=current_context_name)
if exit_code == 100:
raise SystemExit(
f"Please manually re-run all cells. Updated or newly installed plugin packages: {', '.join(plugin_packages)}."
)
elif exit_code != 0:
raise Exception(
f"Failed to install plugin packages: {', '.join(plugin_packages)}"
)
return True
# ==================================================================================================================
# context-management related functions
def list_context_names(self) -> List[str]:
"""list the names of all available/registered contexts."""
return list(self._kiara_config.available_context_names)
def retrieve_context_infos(self) -> ContextInfos:
"""Retrieve information about the available/registered contexts."""
return ContextInfos.create_context_infos(self._kiara_config.context_configs)
def get_current_context_name(self) -> str:
"""Retrieve the name fo the current context."""
if self._current_context_alias is None:
self.context
return self._current_context_alias # type: ignore
def create_new_context(self, context_name: str, set_active: bool) -> None:
"""
Create a new context.
Arguments:
context_name: the name of the new context
set_active: set the newly created context as the active one
"""
if context_name in self.list_context_names():
raise Exception(
f"Can't create context with name '{context_name}': context already exists."
)
ctx = self._kiara_config.create_context(context_name, extra_pipelines=None)
if set_active:
self._current_context = ctx
self._current_context_alias = context_name
def set_active_context(self, context_name: str, create: bool = False) -> None:
if not context_name:
raise Exception("No context name provided.")
if context_name == self._current_context_alias:
return
if context_name not in self.list_context_names():
if create:
self._current_context = self._kiara_config.create_context(
context=context_name, extra_pipelines=None
)
self._current_context_alias = context_name
return
else:
raise Exception(f"No context with name '{context_name}' available.")
self._current_context = self._kiara_config.create_context(
context=context_name, extra_pipelines=None
)
self._current_context_alias = context_name
# ==================================================================================================================
# methods for data_types
def list_data_type_names(self, include_profiles: bool = False) -> List[str]:
"""Get a list of all registered data types.
Arguments:
include_profiles: if True, also include the names of all registered data type profiles
"""
return self.context.type_registry.get_data_type_names(
include_profiles=include_profiles
)
def is_internal_data_type(self, data_type_name: str) -> bool:
"""Checks if the data type is prepdominantly used internally by kiara, or whether it should be exposed to the user."""
return self.context.type_registry.is_internal_type(
data_type_name=data_type_name
)
def retrieve_data_types_info(
self,
filter: Union[str, Iterable[str], None] = None,
include_data_type_profiles: bool = False,
python_package: Union[None, str] = None,
) -> DataTypeClassesInfo:
"""
Retrieve information about all data types.
A data type is a Python class that inherits from [DataType[kiara.data_types.DataType], and it wraps a specific
Python class that holds the actual data and provides metadata and convenience methods for managing the data internally. Data types are not directly used by users, but they are exposed in the input/output schemas of moudles and other data-related features.
Arguments:
filter: an optional string or (list of strings) the returned datatype ids have to match (all filters in the case of a list)
include_data_type_profiles: if True, also include the names of all registered data type profiles
python_package: if provided, only return data types that are defined in the given python package
Returns:
an object containing all information about all data types
"""
if python_package:
data_type_info = self.context.type_registry.get_context_metadata(
only_for_package=python_package
)
if filter:
title = f"Filtered data types in package '{python_package}'"
if isinstance(filter, str):
filter = [filter]
filtered_types: Dict[str, DataTypeClassInfo] = {}
for dt in data_type_info.item_infos.keys():
match = True
for f in filter:
if f.lower() not in dt.lower():
match = False
break
if match:
filtered_types[dt] = data_type_info.item_infos[dt]
data_types_info = DataTypeClassesInfo(
group_title=title, item_infos=filtered_types
)
data_types_info._kiara = self.context
else:
title = f"All data types in package '{python_package}'"
data_types_info = data_type_info
data_types_info.group_title = title
else:
if filter:
if isinstance(filter, str):
filter = [filter]
title = f"Filtered data_types: {filter}"
data_type_names: Iterable[str] = []
for m in self.context.type_registry.get_data_type_names(
include_profiles=include_data_type_profiles
):
match = True
for f in filter:
if f.lower() not in m.lower():
match = False
break
if match:
data_type_names.append(m) # type: ignore
else:
title = "All data types"
data_type_names = self.context.type_registry.get_data_type_names(
include_profiles=include_data_type_profiles
)
data_types = {
d: self.context.type_registry.get_data_type_cls(d)
for d in data_type_names
}
data_types_info = DataTypeClassesInfo.create_from_type_items( # type: ignore
kiara=self.context, group_title=title, **data_types
)
return data_types_info # type: ignore
def retrieve_data_type_info(self, data_type_name: str) -> DataTypeClassInfo:
"""
Retrieve information about a specific data type.
Arguments:
data_type: the registered name of the data type
Returns:
an object containing all information about a data type
"""
dt_cls = self.context.type_registry.get_data_type_cls(data_type_name)
info = DataTypeClassInfo.create_from_type_class(
kiara=self.context, type_cls=dt_cls
)
return info
# ==================================================================================================================
# methods for module and operations info
def list_module_type_names(self) -> List[str]:
"""Get a list of all registered module types."""
return list(self.context.module_registry.get_module_type_names())
def retrieve_module_types_info(
self,
filter: Union[None, str, Iterable[str]] = None,
python_package: Union[str, None] = None,
) -> ModuleTypesInfo:
"""
Retrieve information for all available module types (or a filtered subset thereof).
A module type is Python class that inherits from [KiaraModule][kiara.modules.KiaraModule], and is the basic
building block for processing pipelines. Module types are not used directly by users, Operations are. Operations
are instantiated modules (meaning: the module & some (optional) configuration).
Arguments:
filter: an optional string (or list of string) the returned module names have to match (all filters in case of list)
python_package: an optional string, if provided, only modules from the specified python package are returned
Returns:
a mapping object containing module names as keys, and information about the modules as values
"""
if python_package:
modules_type_info = self.context.module_registry.get_context_metadata(
only_for_package=python_package
)
if filter:
title = f"Filtered modules: {filter} (in package '{python_package}')"
if isinstance(filter, str):
filter = [filter]
filtered_types: Dict[str, ModuleTypeInfo] = {}
for m in modules_type_info.item_infos.keys():
match = True
for f in filter:
if f.lower() not in m.lower():
match = False
break
if match:
filtered_types[m] = modules_type_info.item_infos[m]
module_types_info = ModuleTypesInfo(
group_title=title, item_infos=filtered_types
)
module_types_info._kiara = self.context
else:
title = f"All modules in package '{python_package}'"
module_types_info = modules_type_info
module_types_info.group_title = title
else:
if filter:
if isinstance(filter, str):
filter = [filter]
title = f"Filtered modules: {filter}"
module_types_names: Iterable[str] = []
for m in self.context.module_registry.get_module_type_names():
match = True
for f in filter:
if f.lower() not in m.lower():
match = False
break
if match:
module_types_names.append(m) # type: ignore
else:
title = "All modules"
module_types_names = (
self.context.module_registry.get_module_type_names()
)
module_types = {
n: self.context.module_registry.get_module_class(n)
for n in module_types_names
}
module_types_info = ModuleTypesInfo.create_from_type_items( # type: ignore
kiara=self.context, group_title=title, **module_types
)
return module_types_info # type: ignore
def retrieve_module_type_info(self, module_type: str) -> ModuleTypeInfo:
"""
Retrieve information about a specific module type.
This can be used to retrieve information like module documentation and configuration options.
Arguments:
module_type: the registered name of the module
Returns:
an object containing all information about a module type
"""
m_cls = self.context.module_registry.get_module_class(module_type)
info = ModuleTypeInfo.create_from_type_class(kiara=self.context, type_cls=m_cls)
return info
def create_operation(
self,
module_type: str,
module_config: Union[Mapping[str, Any], str, None] = None,
) -> Operation:
"""
Create an [Operation][kiara.models.module.operation.Operation] instance for the specified module type and (optional) config.
This can be used to get information about the operation itself, it's inputs & outputs schemas, documentation etc.
Arguments:
module_type: the registered name of the module
module_config: (Optional) configuration for the module instance.
Returns:
an Operation instance (which contains all the available information about an instantiated module)
"""
if module_config is None:
module_config = {}
elif isinstance(module_config, str):
try:
module_config = json.load(module_config) # type: ignore
except Exception:
try:
module_config = yaml.load(module_config) # type: ignore
except Exception:
raise Exception(
f"Can't parse module config string: {module_config}."
)
if module_type == "pipeline":
if not module_config:
raise Exception("Pipeline configuration can't be empty.")
assert module_config is None or isinstance(module_config, Mapping)
operation = create_operation(
"pipeline", operation_config=module_config, kiara=self.context
)
return operation
else:
mc = Manifest(module_type=module_type, module_config=module_config)
module_obj = self.context.module_registry.create_module(mc)
return module_obj.operation
def list_operation_ids(
self,
filter: Union[str, None, Iterable[str]] = None,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
operation_types: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
python_packages: Union[str, None, Iterable[str]] = None,
) -> List[str]:
"""
Get a list of all operation ids that match the specified filter.
Arguments:
filter: the (optional) filter string(s), an operation must match all of them to be included in the result
input_types: each operation must have at least one input that matches one of the specified types
output_types: each operation must have at least one output that matches one of the specified types
operation_types: only include operations of the specified type(s)
include_internal: whether to include operations that are predominantly used internally in kiara.
python_packages: only include operations that are contained in one of the provided python packages
"""
if not filter and include_internal and not python_packages:
return sorted(self.context.operation_registry.operation_ids)
else:
return sorted(
self.list_operations(
filter=filter,
input_types=input_types,
output_types=output_types,
operation_types=operation_types,
include_internal=include_internal,
python_packages=python_packages,
).keys()
)
def get_operation(
self,
operation: Union[Mapping[str, Any], str, Path],
allow_external: Union[bool, None] = None,
) -> Operation:
"""
Return the operation instance with the specified id.
This can be used to get information about a specific operation, like inputs/outputs scheman, documentation, etc.
The order in which the operation argument is resolved:
- if it's a string, and an existing, registered operation_id, the associated operation is returned
- if it's a path to an existing file, the content of the file is loaded into a dict and depending on the content a pipeline module will be created, or a 'normal' manifest (if module_type is a key in the dict)
Arguments:
operation: the operation id, module_type_name, path to a file, or url
allow_external: if True, allow loading operations from external sources (e.g. a URL), if 'None' is provided, the configured value in the runtime configuration is used.
Returns:
operation instance data
"""
_module_type = None
_module_config: Any = None
if allow_external is None:
allow_external = self.get_runtime_config().allow_external
if isinstance(operation, Path):
operation = operation.as_posix()
if (
isinstance(operation, Mapping)
and "module_type" in operation.keys()
and "module_config" in operation.keys()
and not operation["module_config"]
):
operation = operation["module_type"]
if isinstance(operation, str):
if operation in self.list_operation_ids(include_internal=True):
_operation = self.context.operation_registry.get_operation(operation)
return _operation
if not allow_external:
raise NoSuchExecutionTargetException(
selected_target=operation,
available_targets=self.context.operation_registry.operation_ids,
msg=f"Can't find operation with id '{operation}', and external operations are not allowed.",
)
if os.path.isfile(operation):
try:
from kiara.models.module.pipeline import PipelineConfig
# we use the 'from_file' here, because that will resolve any relative paths in the pipeline
# if this doesn't work, we just assume the file is not a pipeline configuration but
# a manifest file with 'module_type' and optional 'module_config' keys
pipeline_conf = PipelineConfig.from_file(
path=operation, kiara=self.context
)
_module_config = pipeline_conf.model_dump()
except Exception as e:
log_exception(e)
_module_config = get_data_from_file(operation)
elif operation.startswith("http"):
_module_config = get_data_from_url(operation)
else:
try:
_module_config = json.load(operation) # type: ignore
except Exception:
try:
_module_config = yaml.load(operation) # type: ignore
except Exception:
raise Exception(
f"Can't parse configuration string: {operation}."
)
if not isinstance(_module_config, Mapping):
raise NoSuchExecutionTargetException(
selected_target=operation,
available_targets=self.context.operation_registry.operation_ids,
msg=f"Can't find operation or execution target for string '{operation}'.",
)
else:
_module_config = dict(operation) # type: ignore
if "module_type" in _module_config.keys():
_module_type = _module_config["module_type"]
_module_config = _module_config.get("module_config", {})
else:
_module_type = "pipeline"
op = self.create_operation(
module_type=_module_type, module_config=_module_config
)
return op
def list_operations(
self,
filter: Union[str, None, Iterable[str]] = None,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
operation_types: Union[str, Iterable[str], None] = None,
python_packages: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
) -> "OperationsMap":
"""
List all available operations, optionally filter.
Arguments:
filter: the (optional) filter string(s), an operation must match all of them to be included in the result
input_types: each operation must have at least one input that matches one of the specified types
output_types: each operation must have at least one output that matches one of the specified types
operation_types: only include operations of the specified type(s)
include_internal: whether to include operations that are predominantly used internally in kiara.
python_packages: only include operations that are contained in one of the provided python packages
Returns:
a dictionary with the operation id as key, and [kiara.models.module.operation.Operation] instance data as value
"""
if operation_types:
if isinstance(operation_types, str):
operation_types = [operation_types]
temp: Dict[str, Operation] = {}
for op_type_name in operation_types:
op_type = self.context.operation_registry.operation_types.get(
op_type_name, None
)
if op_type is None:
raise Exception(f"Operation type not registered: {op_type_name}")
temp.update(op_type.operations)
operations: Mapping[str, Operation] = temp
else:
operations = self.context.operation_registry.operations
if filter:
if isinstance(filter, str):
filter = [filter]
temp = {}
for op_id, op in operations.items():
match = True
for f in filter:
if not f:
continue
if f.lower() not in op_id.lower():
match = False
break
if match:
temp[op_id] = op
operations = temp
if not include_internal:
temp = {}
for op_id, op in operations.items():
if not op.operation_details.is_internal_operation:
temp[op_id] = op
operations = temp
if input_types:
if isinstance(input_types, str):
input_types = [input_types]
temp = {}
for op_id, op in operations.items():
for input_type in input_types:
match = False
for schema in op.inputs_schema.values():
if schema.type == input_type:
temp[op_id] = op
match = True
break
if match:
break
operations = temp
if output_types:
if isinstance(output_types, str):
output_types = [output_types]
temp = {}
for op_id, op in operations.items():
for output_type in output_types:
match = False
for schema in op.outputs_schema.values():
if schema.type == output_type:
temp[op_id] = op
match = True
break
if match:
break
operations = temp
if python_packages:
temp = {}
if isinstance(python_packages, str):
python_packages = [python_packages]
for op_id, op in operations.items():
info = OperationInfo.create_from_instance(
kiara=self.context, instance=op
)
pkg = info.context.labels.get("package", None)
if pkg in python_packages:
temp[op_id] = op
operations = temp
from kiara.interfaces.python_api.models.doc import OperationsMap
return OperationsMap.model_construct(root=operations) # type: ignore
def retrieve_operation_info(
self, operation: str, allow_external: bool = False
) -> OperationInfo:
"""
Return the full information for the specified operation id.
This is similar to the 'get_operation' method, but returns additional information. Only use this instead of
'get_operation' if you need the additional info, as it's more expensive to get.
Arguments:
operation: the operation id
Returns:
augmented operation instance data
"""
if not allow_external:
op = self.context.operation_registry.get_operation(operation_id=operation)
else:
op = create_operation(module_or_operation=operation)
op_info = OperationInfo.create_from_operation(kiara=self.context, operation=op)
return op_info
def retrieve_operations_info(
self,
*filters,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
operation_types: Union[str, Iterable[str], None] = None,
python_packages: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
) -> OperationGroupInfo:
"""
Retrieve information about the matching operations.
This retrieves the same list of operations as [list_operations][kiara.interfaces.python_api.KiaraAPI.list_operations],
but augments each result instance with additional information that might be useful in frontends.
'OperationInfo' objects contains augmented information on top of what 'normal' [Operation][kiara.models.module.operation.Operation] objects
hold, but they can take longer to create/resolve. If you don't need any
of the augmented information, just use the [list_operations][kiara.interfaces.python_api.KiaraAPI.list_operations] method
instead.
Arguments:
filters: the (optional) filter strings, an operation must match all of them to be included in the result
include_internal: whether to include operations that are predominantly used internally in kiara.
input_types: each operation must have at least one input that matches one of the specified types
output_types: each operation must have at least one output that matches one of the specified types
operation_types: only include operations of the specified type(s)
include_internal: whether to include operations that are predominantly used internally in kiara.
python_packages: only include operations that are contained in one of the provided python packages
Returns:
a wrapper object containing a dictionary of items with value_id as key, and [kiara.interfaces.python_api.models.info.OperationInfo] as value
"""
title = "Available operations"
if filters:
title = "Filtered operations"
operations = self.list_operations(
filters,
input_types=input_types,
output_types=output_types,
include_internal=include_internal,
operation_types=operation_types,
python_packages=python_packages,
)
ops_info = OperationGroupInfo.create_from_operations(
kiara=self.context, group_title=title, **operations
)
return ops_info
# ==================================================================================================================
# methods relating to pipelines
def list_pipeline_ids(
self,
filter: Union[str, None, Iterable[str]] = None,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
python_packages: Union[str, None, Iterable[str]] = None,
) -> List[str]:
"""
Get a list of all pipeline (operation) ids that match the specified filter.
Arguments:
filter: an optional single or list of filters (all filters must match the operation id for the operation to be included)
include_internal: also return internal pipelines
"""
return self.list_operation_ids(
filter=filter,
input_types=input_types,
output_types=output_types,
operation_types=["pipeline"],
include_internal=include_internal,
python_packages=python_packages,
)
def list_pipelines(
self,
filter: Union[str, None, Iterable[str]] = None,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
python_packages: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
) -> "PipelinesMap":
"""List all available pipelines, optionally filter.
Arguments:
filter: the (optional) filter string(s), an operation must match all of them to be included in the result
input_types: each operation must have at least one input that matches one of the specified types
output_types: each operation must have at least one output that matches one of the specified types
operation_types: only include operations of the specified type(s)
include_internal: whether to include operations that are predominantly used internally in kiara.
python_packages: only include operations that are contained in one of the provided python packages
Returns:
a dictionary with the operation id as key, and [kiara.models.module.operation.Operation] instance data as value
"""
from kiara.interfaces.python_api.models.doc import PipelinesMap
ops = self.list_operations(
filter=filter,
input_types=input_types,
output_types=output_types,
operation_types=["pipeline"],
python_packages=python_packages,
include_internal=include_internal,
)
result: Dict[str, PipelineStructure] = {}
for op in ops.values():
details: PipelineOperationDetails = op.operation_details
config: "PipelineConfig" = details.pipeline_config
structure = config.structure
result[op.operation_id] = structure
return PipelinesMap.model_construct(root=result)
def get_pipeline(
self,
pipeline: Union[Mapping[str, Any], str, Path],
allow_external: Union[bool, None] = None,
) -> "PipelineStructure":
"""
Return the pipeline (Structure) instance with the specified id.
This can be used to get information about a pipeline, like inputs/outputs scheman, documentation, included steps, stages, etc.
The order in which the operation argument is resolved:
- if it's a string, and an existing, registered operation_id, the associated operation is returned
- if it's a path to an existing file, the content of the file is loaded into a dict and a pipeline operation will be created
Arguments:
pipeline: the pipeline id, module_type_name, path to a file, or url
allow_external: if True, allow loading operations from external sources (e.g. a URL), if 'None' is provided, the configured value in the runtime configuration is used.
Returns:
pipeline structure data
"""
op = self.get_operation(operation=pipeline, allow_external=allow_external)
if op.module_type != "pipeline":
raise KiaraException(
f"Operation '{op.operation_id}' is not a pipeline, but a '{op.module_type}'"
)
details: PipelineOperationDetails = op.operation_details # type: ignore
config: "PipelineConfig" = details.pipeline_config
return config.structure
def retrieve_pipeline_info(
self, pipeline: str, allow_external: bool = False
) -> "PipelineInfo":
"""
Return the full information for the specified pipeline id.
This is similar to the 'get_pipeline' method, but returns additional information. Only use this instead of
'get_pipeline' if you need the additional info, as it's more expensive to get.
Arguments:
pipeline: the pipeline (operation) id
Returns:
augmented pipeline instance data
"""
if not allow_external:
op = self.context.operation_registry.get_operation(operation_id=pipeline)
else:
op = create_operation(module_or_operation=pipeline)
if op.module_type != "pipeline":
raise KiaraException(
f"Operation '{op.operation_id}' is not a pipeline, but a '{op.module_type}'"
)
from kiara.models.module.pipeline.pipeline import Pipeline, PipelineInfo
details: PipelineOperationDetails = op.operation_details # type: ignore
config: "PipelineConfig" = details.pipeline_config
pipeline_instance = Pipeline(structure=config.structure, kiara=self.context)
p_info: PipelineInfo = PipelineInfo.create_from_instance(
kiara=self.context, instance=pipeline_instance
)
return p_info
def retrieve_pipelines_info(
self,
*filters,
input_types: Union[str, Iterable[str], None] = None,
output_types: Union[str, Iterable[str], None] = None,
python_packages: Union[str, Iterable[str], None] = None,
include_internal: bool = False,
) -> "PipelineGroupInfo":
"""
Retrieve information about the matching pipelines.
This retrieves the same list of pipelines as [list_pipelines][kiara.interfaces.python_api.KiaraAPI.list_pipelines],
but augments each result instance with additional information that might be useful in frontends.
'PipelineInfo' objects contains augmented information on top of what 'normal' [PipelineStructure][kiara.models.module.pipeline.PipelineStructure] objects
hold, but they can take longer to create/resolve. If you don't need any
of the augmented information, just use the [list_pipelines][kiara.interfaces.python_api.KiaraAPI.list_pipelines] method
instead.
Arguments:
filters: the (optional) filter strings, an operation must match all of them to be included in the result
include_internal: whether to include operations that are predominantly used internally in kiara.
input_types: each operation must have at least one input that matches one of the specified types
output_types: each operation must have at least one output that matches one of the specified types
include_internal: whether to include operations that are predominantly used internally in kiara.
python_packages: only include operations that are contained in one of the provided python packages
Returns:
a wrapper object containing a dictionary of items with value_id as key, and [kiara.interfaces.python_api.models.info.OperationInfo] as value
"""
title = "Available pipelines"
if filters:
title = "Filtered pipelines"
operations = self.list_operations(
filters,
input_types=input_types,
output_types=output_types,
include_internal=include_internal,
operation_types=["pipeline"],
python_packages=python_packages,
)
from kiara.models.module.pipeline.pipeline import Pipeline, PipelineGroupInfo
pipelines = {}
for op_id, op in operations.items():
details: PipelineOperationDetails = op.operation_details # type: ignore
config: "PipelineConfig" = details.pipeline_config
pipeline = Pipeline(structure=config.structure, kiara=self.context)
pipelines[op_id] = pipeline
ps_info = PipelineGroupInfo.create_from_pipelines(
kiara=self.context, group_title=title, **pipelines
)
return ps_info
def register_pipeline(
self,
data: Union[Path, str, Mapping[str, Any]],
operation_id: Union[str, None] = None,
) -> Operation:
"""
Register a pipelne as new operation into this context.
Arguments:
data: a dict or a path to a json/yaml file containing the definition
operation_id: the id to use for the operation (if not specified, the id will be auto-determined)
Returns:
the assembled operation
"""
return self.context.operation_registry.register_pipeline(
data=data, operation_id=operation_id
)
def register_pipelines(
self, *pipeline_paths: Union[str, Path]
) -> Dict[str, Operation]:
"""Register all pipelines found in the specified paths."""
return self.context.operation_registry.register_pipelines(*pipeline_paths)
# ==================================================================================================================
# methods relating to values and data
def register_data(
self,
data: Any,
data_type: Union[None, str, ValueSchema, Mapping[str, Any]] = None,
reuse_existing: bool = False,
) -> Value:
"""
Register data with kiara.
This will create a new value instance from the data and return it. The data/value itself won't be stored
in a store, you have to use the 'store_value' function for that.
Arguments:
data: the data to register
data_type: (optional) the data type of the data. If not provided, kiara will try to infer the data type.
reuse_existing: whether to re-use an existing value that is already registered and has the same hash.
Returns:
a [kiara.models.values.value.Value] instance
"""
if data_type is None:
raise NotImplementedError(
"Infering data types not implemented yet. Please provide one manually."
)
value = self.context.data_registry.register_data(
data=data, schema=data_type, reuse_existing=reuse_existing
)
return value
def list_value_ids(self, **matcher_params) -> List[uuid.UUID]:
"""
List all available value ids for this kiara context.
This method exists mainly so frontend can retrieve a list of all value_ids that exists on the backend without
having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
would be preferable in most cases, because it is called under the hood, and the performance advantage of not
having to look up value details is gone.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a list of value ids
"""
if matcher_params:
values = self.list_values(**matcher_params)
return sorted((v.value_id for v in values.values()))
else:
_values = self.context.data_registry.retrieve_all_available_value_ids()
return sorted(_values)
def list_values(self, **matcher_params: Any) -> ValueMapReadOnly:
"""
List all available values, optionally filter.
Retrieve information about all values that are available in the current kiara context session (both stored
and non-stored).
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a dictionary with value_id as key, and [kiara.models.values.value.Value] as value
"""
if matcher_params:
matcher = ValueMatcher.create_matcher(**matcher_params)
values = self.context.data_registry.find_values(matcher=matcher)
else:
# TODO: make that parallel?
values = {
k: self.context.data_registry.get_value(k)
for k in self.context.data_registry.retrieve_all_available_value_ids()
}
result = ValueMapReadOnly.create_from_values(
**{str(k): v for k, v in values.items()}
)
return result
def get_value(self, value: Union[str, Value, uuid.UUID, Path]) -> Value:
"""
Retrieve a value instance with the specified id or alias.
Raises an exception if no value could be found.
Arguments:
value: a value id, alias or object that has a 'value_id' attribute.
Returns:
the Value instance
"""
return self.context.data_registry.get_value(value=value)
def query_value(
self,
value_or_path: Union[str, Value, uuid.UUID],
query_path: Union[str, None] = None,
) -> Any:
"""
Retrieve a value attribute with the specified id or alias.
A query path is delimited by "::", and has the following format:
```
<value_id_or_alias>::[<category_name>]::[<attribute_name>]::[...]
```
Currently supported categories:
- "data": the data of the value
- "properties: the properties of the value
If no category is specified, the value instance itself is returned.
Raises an exception if no value could be found.
Arguments:
value_or_path: a value or value reference, or a query path containing the value id or alias as first token
query_path: a query path which will be appended a potential query path computed from the first argument
Returns:
the attribute value
"""
if isinstance(value_or_path, str):
tokens = value_or_path.split(VALUE_ATTR_DELIMITER)
value_id = tokens.pop(0)
_value = self.get_value(value=value_id)
else:
tokens = []
_value = self.get_value(value=value_or_path)
if query_path:
tokens.extend(query_path.split(VALUE_ATTR_DELIMITER))
if not tokens:
return _value
current_result: Any = _value
category = tokens.pop(0)
if category == "properties":
current_result = current_result.get_all_property_data(flatten_models=True)
elif category == "data":
current_result = current_result.data
else:
raise KiaraException(
f"Invalid query path category: {category}. Valid categories are: {', '.join(VALID_VALUE_QUERY_CATEGORIES)}"
)
if tokens:
try:
path = VALUE_ATTR_DELIMITER.join(tokens)
current_result = dpath.get(
current_result, path, separator=VALUE_ATTR_DELIMITER
)
except Exception:
def dict_path(path, my_dict, all_paths):
for k, v in my_dict.items():
if isinstance(v, dict):
dict_path(path + "::" + k, v, all_paths)
else:
all_paths.append(path[2:] + "::" + k)
valid_base_keys = list(current_result.keys())
details = "Valid (base) sub-keys are:\n\n"
for k in valid_base_keys:
details += f" - {k}\n"
all_paths: List[str] = []
dict_path("", current_result, all_paths)
details += "\nValid (full) sub-paths are:\n\n"
for k in all_paths:
details += f" - {k}\n"
raise KiaraException(
msg=f"Failed to retrieve value attribute using query sub-path: {path}",
details=details,
)
return current_result
def retrieve_value_info(
self, value: Union[str, uuid.UUID, Value, Path]
) -> ValueInfo:
"""
Retrieve an info object for a value.
'ValueInfo' objects contains augmented information on top of what 'normal' [Value][kiara.models.values.value.Value] objects
hold (like resolved properties for example), but they can take longer to create/resolve. If you don't need any
of the augmented information, just use the [get_value][kiara.interfaces.python_api.KiaraAPI.get_value] method
instead.
Arguments:
value: a value id, alias or object that has a 'value_id' attribute.
Returns:
the ValueInfo instance
"""
_value = self.get_value(value=value)
return ValueInfo.create_from_instance(kiara=self.context, instance=_value)
def retrieve_values_info(self, **matcher_params) -> ValuesInfo:
"""
Retrieve information about the matching values.
This retrieves the same list of values as [list_values][kiara.interfaces.python_api.KiaraAPI.list_values],
but augments each result value instance with additional information that might be useful in frontends.
'ValueInfo' objects contains augmented information on top of what 'normal' [Value][kiara.models.values.value.Value] objects
hold (like resolved properties for example), but they can take longer to create/resolve. If you don't need any
of the augmented information, just use the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values] method
instead.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a wrapper object containing the items as dictionary with value_id as key, and [kiara.interfaces.python_api.models.values.ValueInfo] as value
"""
values: MutableMapping[str, Value] = self.list_values(**matcher_params)
infos = ValuesInfo.create_from_instances(
kiara=self.context, instances={str(k): v for k, v in values.items()}
)
return infos # type: ignore
def list_alias_names(self, **matcher_params) -> List[str]:
"""
List all available alias keys.
This method exists mainly so frontend can retrieve a list of all value_ids that exists on the backend without
having to look up the details of each value (like [list_aliases][kiara.interfaces.python_api.KiaraAPI.list_aliases]
does). This method can also be used with a matcher, but in this case the [list_aliases][kiara.interfaces.python_api.KiaraAPI.list_aliases]
would be preferrable in most cases, because it is called under the hood, and the performance advantage of not
having to look up value details is gone.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a list of value ids
"""
if matcher_params:
values = self.list_aliases(**matcher_params)
return list(values.keys())
else:
_values = self.context.alias_registry.all_aliases
return list(_values)
def list_aliases(self, **matcher_params) -> ValueMapReadOnly:
"""
List all available values that have an alias assigned, optionally filter.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a dictionary with value_id as key, and [kiara.models.values.value.Value] as value
"""
if matcher_params:
matcher_params["has_alias"] = True
all_values = self.list_values(**matcher_params)
result: Dict[str, Value] = {}
for value in all_values.values():
aliases = self.context.alias_registry.find_aliases_for_value_id(
value_id=value.value_id
)
for a in aliases:
if a in result.keys():
raise Exception(
f"Duplicate value alias '{a}': this is most likely a bug."
)
result[a] = value
result = {k: result[k] for k in sorted(result.keys())}
else:
# faster if not other matcher params
all_aliases = self.context.alias_registry.all_aliases
result = {
k: self.context.data_registry.get_value(f"alias:{k}")
for k in all_aliases
}
return ValueMapReadOnly.create_from_values(**result)
def retrieve_aliases_info(self, **matcher_params) -> ValuesInfo:
"""
Retrieve information about the matching values.
This retrieves the same list of values as [list_values][kiara.interfaces.python_api.KiaraAPI.list_values],
but augments each result value instance with additional information that might be useful in frontends.
'ValueInfo' objects contains augmented information on top of what 'normal' [Value][kiara.models.values.value.Value] objects
hold (like resolved properties for example), but they can take longer to create/resolve. If you don't need any
of the augmented information, just use the [get_value][kiara.interfaces.python_api.KiaraAPI.list_aliases] method
instead.
Arguments:
matcher_params: the (optional) filter parameters, check the [ValueMatcher][kiara.models.values.matchers.ValueMatcher] class for available parameters
Returns:
a dictionary with a value alias as key, and [kiara.interfaces.python_api.models.values.ValueInfo] as value
"""
values = self.list_aliases(**matcher_params)
infos = ValuesInfo.create_from_instances(
kiara=self.context, instances={str(k): v for k, v in values.items()}
)
return infos # type: ignore
def assemble_value_map(
self,
values: Mapping[str, Union[uuid.UUID, None, str, Value, Any]],
values_schema: Union[None, Mapping[str, ValueSchema]] = None,
register_data: bool = False,
reuse_existing_data: bool = False,
) -> ValueMapReadOnly:
"""
Retrive a [ValueMap][TODO] object from the provided value ids or value links.
By default, this method can only use values/datasets that are already registered in *kiara*. If you want to
auto-register 'raw' data, you need to set the 'register_data' flag to 'True', and provide a schema for each of the fields that are not yet registered.
Arguments:
values: a dictionary with the values in question
values_schema: an optional dictionary with the schema for each of the values that are not yet registered
register_data: whether to allow auto-registration of 'raw' data
reuse_existing_data: whether to reuse existing data with the same hash as the 'raw' data that is being registered
Returns:
a value map instance
"""
if register_data:
temp: Dict[str, Union[str, Value, uuid.UUID, None]] = {}
for k, v in values.items():
if isinstance(v, (Value, uuid.UUID)):
temp[k] = v
continue
if not values_schema:
details = "No schema provided."
raise KiaraException(
f"Invalid field name: '{k}' (value: {v}).", details=details
)
if k not in values_schema.keys():
details = "Valid field names: " + ", ".join(values_schema.keys())
raise KiaraException(
f"Invalid field name: '{k}' (value: {v}).", details=details
)
if isinstance(v, str):
if v.startswith("alias:"):
temp[k] = v
continue
elif v.startswith("archive:"):
temp[k] = v
continue
try:
v = uuid.UUID(v)
temp[k] = v
continue
except Exception:
if v.startswith("alias:"): # type: ignore
_v = v.replace("alias:", "") # type: ignore
else:
_v = v
data_type = values_schema[k].type
if data_type != "string" and _v in self.list_aliases():
temp[k] = f"alias:{_v}"
continue
if v is None:
temp[k] = None
else:
_v = self.register_data(
data=v,
# data_type=values_schema[k].type,
data_type=values_schema[k],
reuse_existing=reuse_existing_data,
)
temp[k] = _v
values = temp
return self.context.data_registry.load_values(
values=values, values_schema=values_schema
)
def store_value(
self,
value: Union[str, uuid.UUID, Value],
alias: Union[str, Iterable[str], None],
allow_overwrite: bool = True,
data_store: Union[str, None] = None,
alias_store: Union[str, None] = None,
) -> StoreValueResult:
"""
Store the specified value in the (default) value store.
This method does not raise an error if the storing of the value fails, so you have to investigate the
'StoreValueResult' instance that is returned to see if the storing was successful.
Arguments:
value: the value (or a reference to it)
alias: (Optional) aliases for the value
allow_overwrite: whether to allow overwriting existing aliases
"""
if isinstance(alias, str):
alias = [alias]
value_obj = self.get_value(value)
persisted_data: Union[None, PersistedData] = None
try:
persisted_data = self.context.data_registry.store_value(
value=value_obj, data_store=data_store
)
if alias:
self.context.alias_registry.register_aliases(
value_obj.value_id,
*alias,
allow_overwrite=allow_overwrite,
alias_store=alias_store,
)
result = StoreValueResult(
value=value_obj,
aliases=sorted(alias) if alias else [],
error=None,
persisted_data=persisted_data,
)
except Exception as e:
log_exception(e)
result = StoreValueResult(
value=value_obj,
aliases=sorted(alias) if alias else [],
error=(
str(e) if str(e) else f"Unknown error (type '{type(e).__name__}')."
),
persisted_data=persisted_data,
)
return result
def store_values(
self,
values: Union[
Mapping[str, Union[str, uuid.UUID, Value]],
Iterable[Union[str, uuid.UUID, Value]],
],
alias_map: Union[Mapping[str, Iterable[str]], bool, str] = False,
allow_overwrite: bool = True,
data_store: Union[str, None] = None,
alias_store: Union[str, None] = None,
) -> StoreValuesResult:
"""
Store multiple values into the (default) kiara value store.
If you provide a non-mapping interable as 'values', the 'alias_map' argument must be 'False', and using aliases is not possible.
If you use a mapping iterable as 'values':
If alias_map is 'False', no aliases will be registered. If 'True', the key in the 'values' argument will be used. If the value is a string, all keys from the 'values' map will be used as alias, prefixed with the value of 'alias_map' + '.'.
Alternatively, if a map is provided, the key in the 'values' argument will be used to look up the alias(es) in the
'alias_map' argument.
This method does not raise an error if the storing of the value fails, so you have to investigate the
'StoreValuesResult' instance that is returned to see if the storing was successful.
Arguments:
values: an iterable/map of value keys/values
alias_map: a map of value keys aliases
Returns:
an object outlining which values (identified by the specified value key or an enumerated index) where stored and how
"""
result = {}
if not isinstance(values, Mapping):
if alias_map is not False:
raise KiaraException(
msg="Cannot use aliases with non-mapping iterable."
)
for idx, value in enumerate(values):
value_obj = self.get_value(value)
store_result = self.store_value(
value=value_obj,
alias=None,
allow_overwrite=allow_overwrite,
data_store=data_store,
alias_store=alias_store,
)
result[f"value_{idx}"] = store_result
else:
for field_name, value in values.items():
if alias_map is False:
aliases: Union[None, Iterable[str]] = None
elif alias_map is True:
aliases = [field_name]
elif isinstance(alias_map, str):
aliases = [f"{alias_map}.{field_name}"]
else:
# means it's a mapping
aliases = alias_map.get(field_name)
value_obj = self.get_value(value)
store_result = self.store_value(
value=value_obj,
alias=aliases,
allow_overwrite=allow_overwrite,
data_store=data_store,
alias_store=alias_store,
)
result[field_name] = store_result
return StoreValuesResult(root=result)
# ------------------------------------------------------------------------------------------------------------------
# operation-related methods
def get_operation_type(self, op_type: Union[str, Type[OP_TYPE]]) -> OperationType:
"""Get the management object for the specified operation type."""
return self.context.operation_registry.get_operation_type(op_type=op_type)
def retrieve_operation_type_info(
self, op_type: Union[str, Type[OP_TYPE]]
) -> OperationTypeInfo:
"""Get an info object for the specified operation type."""
_op_type = self.get_operation_type(op_type=op_type)
return OperationTypeInfo.create_from_type_class(
kiara=self.context, type_cls=_op_type.__class__
)
def find_operation_id(
self, module_type: str, module_config: Union[None, Mapping[str, Any]] = None
) -> Union[None, str]:
"""
Try to find the registered operation id for the specified module type and configuration.
Arguments:
module_type: the module type
module_config: the module configuration
Returns:
the registered operation id, if found, or None
"""
manifest = self.context.create_manifest(
module_or_operation=module_type, config=module_config
)
return self.context.operation_registry.find_operation_id(manifest=manifest)
def assemble_filter_pipeline_config(
self,
data_type: str,
filters: Union[str, Iterable[str], Mapping[str, str]],
endpoint: Union[None, Manifest, str] = None,
endpoint_input_field: Union[str, None] = None,
endpoint_step_id: Union[str, None] = None,
extra_input_aliases: Union[None, Mapping[str, str]] = None,
extra_output_aliases: Union[None, Mapping[str, str]] = None,
) -> "PipelineConfig":
"""
Assemble a (pipeline) module config to filter values of a specific data type.
Optionally, a module that uses the filtered dataset as input can be specified.
# TODO: document filter names
For the 'filters' argument, the accepted inputs are:
- a string, in which case a single-step pipeline will be created, with the string referencing the operation id or filter
- a list of strings: in which case a multi-step pipeline will be created, the step_ids will be calculated automatically
- a map of string pairs: the keys are step ids, the values operation ids or filter names
Arguments:
data_type: the type of the data to filter
filters: a list of operation ids or filter names (and potentiall step_ids if type is a mapping)
endpoint: optional module to put as last step in the created pipeline
endpoing_input_field: field name of the input that will receive the filtered value
endpoint_step_id: id to use for the endpoint step (module type name will be used if not provided)
extra_input_aliases: extra output aliases to add to the pipeline config
extra_output_aliases: extra output aliases to add to the pipeline config
Returns:
the (pipeline) module configuration of the filter pipeline
"""
filter_op_type: FilterOperationType = self.context.operation_registry.get_operation_type("filter") # type: ignore
pipeline_config = filter_op_type.assemble_filter_pipeline_config(
data_type=data_type,
filters=filters,
endpoint=endpoint,
endpoint_input_field=endpoint_input_field,
endpoint_step_id=endpoint_step_id,
extra_input_aliases=extra_input_aliases,
extra_output_aliases=extra_output_aliases,
)
return pipeline_config
# ------------------------------------------------------------------------------------------------------------------
# render-related methods
def retrieve_renderer_infos(
self, source_type: Union[str, None] = None, target_type: Union[str, None] = None
) -> RendererInfos:
"""Retrieve information about the available renderers.
Arguments:
source_type: the type of the item to render (optional filter)
target_type: the type/profile of the rendered result (optional filter)
Returns:
a wrapper object containing the items as dictionary with renderer alias as key, and [kiara.interfaces.python_api.models.info.RendererInfo] as value
"""
if not source_type and not target_type:
renderers = self.context.render_registry.registered_renderers
elif source_type and not target_type:
renderers = self.context.render_registry.retrieve_renderers_for_source_type(
source_type=source_type
)
elif target_type and not source_type:
raise KiaraException(msg="Cannot retrieve renderers for target type only.")
else:
renderers = self.context.render_registry.retrieve_renderers_for_source_target_combination(
source_type=source_type, target_type=target_type # type: ignore
)
group = {k.get_renderer_alias(): k for k in renderers}
infos = RendererInfos.create_from_instances(kiara=self.context, instances=group)
return infos # type: ignore
def retrieve_renderers_for(self, source_type: str) -> List[KiaraRenderer]:
return self.context.render_registry.retrieve_renderers_for_source_type(
source_type=source_type
)
def render(
self,
item: Any,
source_type: str,
target_type: str,
render_config: Union[Mapping[str, Any], None] = None,
) -> Any:
"""Render an internal instance of a supported source type into one of the supported target types.
To find out the supported source/target combinations, you can use the kiara cli:
```
kiara render list-renderers
```
or, for a filtered list:
````
kiara render --source-type pipeline list-renderers
```
What Python types are actually supported for the 'item' argument depends on the source_type of the renderer you are calling, for example if that is a pipeline, most of the ways to specify a pipeline would be supported (operation_id, pipeline file, etc.). This might need more documentation, let me know what exactly is needed in a support ticket and I'll add that information.
Arguments:
item: the item to render
source_type: the type of the item to render
target_type: the type/profile of the rendered result
render_config: optional configuration, depends on the renderer that is called
"""
registry = self.context.render_registry
result = registry.render(
item=item,
source_type=source_type,
target_type=target_type,
render_config=render_config,
)
return result
def assemble_render_pipeline(
self,
data_type: str,
target_format: Union[str, Iterable[str]] = "string",
filters: Union[None, str, Iterable[str], Mapping[str, str]] = None,
use_pretty_print: bool = False,
) -> Operation:
"""
Create a manifest describing a transformation that renders a value of the specified data type in the target format.
NOTE: this is a preliminary endpoint, don't use in anger yet.
If a list is provided as value for 'target_format', all items are tried until a 'render_value' operation is found that matches
the value type of the source value, and the provided target format.
Arguments:
value: the value (or value id)
target_format: the format into which to render the value
filters: a list of filters to apply to the value before rendering it
use_pretty_print: if True, use a 'pretty_print' operation instead of 'render_value'
Returns:
the manifest for the transformation
"""
if data_type not in self.context.data_type_names:
raise DataTypeUnknownException(data_type=data_type)
if use_pretty_print:
pretty_print_op_type: PrettyPrintOperationType = (
self.context.operation_registry.get_operation_type("pretty_print")
) # type: ignore
ops = pretty_print_op_type.get_target_types_for(data_type)
else:
render_op_type: (
RenderValueOperationType
) = self.context.operation_registry.get_operation_type(
# type: ignore
"render_value"
) # type: ignore
ops = render_op_type.get_render_operations_for_source_type(data_type)
if isinstance(target_format, str):
target_format = [target_format]
match = None
for _target_type in target_format:
if _target_type not in ops.keys():
continue
match = ops[_target_type]
break
if not match:
if not ops:
msg = f"No render operations registered for source type '{data_type}'."
else:
msg = f"Registered target types for source type '{data_type}': {', '.join(ops.keys())}."
raise Exception(
f"No render operation for source type '{data_type}' to target type(s) registered: '{', '.join(target_format)}'. {msg}"
)
if filters:
# filter_op_type: FilterOperationType = self._kiara.operation_registry.get_operation_type("filter") # type: ignore
endpoint = Manifest(
module_type=match.module_type, module_config=match.module_config
)
extra_input_aliases = {"render_value.render_config": "render_config"}
extra_output_aliases = {
"render_value.render_value_result": "render_value_result"
}
pipeline_config = self.assemble_filter_pipeline_config(
data_type=data_type,
filters=filters,
endpoint=endpoint,
endpoint_input_field="value",
endpoint_step_id="render_value",
extra_input_aliases=extra_input_aliases,
extra_output_aliases=extra_output_aliases,
)
manifest = Manifest(
module_type="pipeline", module_config=pipeline_config.model_dump()
)
module = self.context.module_registry.create_module(manifest=manifest)
operation = Operation.create_from_module(module, doc=pipeline_config.doc)
else:
operation = match
return operation
# ------------------------------------------------------------------------------------------------------------------
# job-related methods
def queue_manifest(
self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None
) -> uuid.UUID:
"""
Queue a job using the provided manifest to describe the module and config that should be executed.
Arguments:
manifest: the manifest
inputs: the job inputs (can be either references to values, or raw inputs
Returns:
a result value map instance
"""
if inputs is None:
inputs = {}
job_config = self.context.job_registry.prepare_job_config(
manifest=manifest, inputs=inputs
)
job_id = self.context.job_registry.execute_job(
job_config=job_config, wait=False
)
return job_id
def run_manifest(
self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None
) -> ValueMapReadOnly:
"""
Run a job using the provided manifest to describe the module and config that should be executed.
Arguments:
manifest: the manifest
inputs: the job inputs (can be either references to values, or raw inputs
Returns:
a result value map instance
"""
job_id = self.queue_manifest(manifest=manifest, inputs=inputs)
return self.context.job_registry.retrieve_result(job_id=job_id)
def queue_job(
self,
operation: Union[str, Path, Manifest, OperationInfo, JobDesc],
inputs: Union[Mapping[str, Any], None],
operation_config: Union[None, Mapping[str, Any]] = None,
) -> uuid.UUID:
"""
Queue a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result.
This is a convenience method that auto-detects what is meant by the 'operation' string input argument.
If the 'operation' is a JobDesc instance, and that JobDesc instance has the 'save' attribute
set, it will be ignored, so you'll have to store any results manually.
Arguments:
operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found)..
inputs: the operation inputs
operation_config: the (optional) module config in case 'operation' is a module name
Returns:
the queued job id
"""
if inputs is None:
inputs = {}
if isinstance(operation, str):
if os.path.isfile(operation):
job_path = Path(operation)
if not job_path.is_file():
raise Exception(
f"Can't queue job from file '{job_path.as_posix()}': file does not exist/not a file."
)
op_data = get_data_from_file(job_path)
if isinstance(op_data, Mapping) and "operation" in op_data.keys():
try:
repl_dict: Dict[str, Any] = {
"this_dir": job_path.parent.as_posix()
}
job_data = replace_var_names_in_obj(
op_data, repl_dict=repl_dict
)
job_data["job_alias"] = job_path.stem
job_desc = JobDesc(**job_data)
_operation: Union[Manifest, str] = job_desc.get_operation(
kiara_api=self
)
if job_desc.inputs:
_inputs = dict(job_desc.inputs)
_inputs.update(inputs)
inputs = _inputs
except Exception as e:
raise KiaraException(
f"Failed to parse job description file: {operation}",
parent=e,
)
else:
_operation = job_path.as_posix()
else:
_operation = operation
elif isinstance(operation, Path):
if not operation.is_file():
raise Exception(
f"Can't queue job from file '{operation.as_posix()}': file does not exist/not a file."
)
_operation = operation.as_posix()
elif isinstance(operation, OperationInfo):
_operation = operation.operation
elif isinstance(operation, JobDesc):
if operation_config:
raise KiaraException(
"Specifying 'operation_config' when operation is a job_desc is invalid."
)
_operation = operation.get_operation(kiara_api=self)
if operation.inputs:
_inputs = dict(operation.inputs)
_inputs.update(inputs)
inputs = _inputs
else:
_operation = operation
if not isinstance(_operation, Manifest):
manifest: Manifest = create_operation(
module_or_operation=_operation,
operation_config=operation_config,
kiara=self.context,
)
else:
manifest = _operation
job_id = self.queue_manifest(manifest=manifest, inputs=inputs)
return job_id
def run_job(
self,
operation: Union[str, Path, Manifest, OperationInfo, JobDesc],
inputs: Union[None, Mapping[str, Any]] = None,
operation_config: Union[None, Mapping[str, Any]] = None,
) -> ValueMapReadOnly:
"""
Run a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result.
This is a convenience method that auto-detects what is meant by the 'operation' string input argument.
In general, try to avoid this method and use 'queue_job', 'get_job' and 'retrieve_job_result' manually instead,
since this is a blocking operation.
If the 'operation' is a JobDesc instance, and that JobDesc instance has the 'save' attribute
set, it will be ignored, so you'll have to store any results manually.
Arguments:
operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found)..
inputs: the operation inputs
operation_config: the (optional) module config in case 'operation' is a module name
Returns:
the job result value map
"""
if inputs is None:
inputs = {}
job_id = self.queue_job(
operation=operation, inputs=inputs, operation_config=operation_config
)
return self.context.job_registry.retrieve_result(job_id=job_id)
def get_job(self, job_id: Union[str, uuid.UUID]) -> ActiveJob:
"""Retrieve the status of the job with the provided id."""
if isinstance(job_id, str):
job_id = uuid.UUID(job_id)
job_status = self.context.job_registry.get_job(job_id=job_id)
return job_status
def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly:
"""Retrieve the result(s) of the specified job."""
if isinstance(job_id, str):
job_id = uuid.UUID(job_id)
result = self.context.job_registry.retrieve_result(job_id=job_id)
return result
def render_value(
self,
value: Union[str, uuid.UUID, Value],
target_format: Union[str, Iterable[str]] = "string",
filters: Union[None, Iterable[str], Mapping[str, str]] = None,
render_config: Union[Mapping[str, str], None] = None,
add_root_scenes: bool = True,
use_pretty_print: bool = False,
) -> RenderValueResult:
"""
Render a value in the specified target format.
NOTE: this is a preliminary endpoint, don't use in anger yet.
If a list is provided as value for 'target_format', all items are tried until a 'render_value' operation is found that matches
the value type of the source value, and the provided target format.
Arguments:
value: the value (or value id)
target_format: the format into which to render the value
filters: an (optional) list of filters
render_config: manifest specific render configuration
add_root_scenes: add root scenes to the result
use_pretty_print: use 'pretty_print' operation instead of 'render_value'
Returns:
the rendered value data, and any related scenes, if applicable
"""
_value = self.get_value(value)
try:
render_operation: Union[None, Operation] = self.assemble_render_pipeline(
data_type=_value.data_type_name,
target_format=target_format,
filters=filters,
use_pretty_print=use_pretty_print,
)
except Exception as e:
log_message(
"create_render_pipeline.failure",
source_type=_value.data_type_name,
target_format=target_format,
error=e,
)
if use_pretty_print:
pretty_print_ops: PrettyPrintOperationType = self.context.operation_registry.get_operation_type("pretty_print") # type: ignore
if not isinstance(target_format, str):
raise NotImplementedError(
"Can't handle multiple target formats for 'render_value' yet."
)
render_operation = (
pretty_print_ops.get_operation_for_render_combination(
source_type="any", target_type=target_format
)
)
else:
render_ops: RenderValueOperationType = self.context.operation_registry.get_operation_type("render_value") # type: ignore
if not isinstance(target_format, str):
raise NotImplementedError(
"Can't handle multiple target formats for 'render_value' yet."
)
render_operation = render_ops.get_render_operation(
source_type="any", target_type=target_format
)
if render_operation is None:
raise Exception(
f"Could not find render operation for value '{_value.value_id}', type: {_value.value_schema.type}"
)
if render_config and "render_config" in render_config.keys():
# raise NotImplementedError()
# TODO: is this necessary?
render_config = render_config["render_config"] # type: ignore
# manifest_hash = render_config["manifest_hash"]
# if manifest_hash != render_operation.manifest_hash:
# raise NotImplementedError(
# "Using a non-default render operation is not supported (yet)."
# )
# render_config = render_config["render_config"]
if render_config is None:
render_config = {}
else:
render_config = dict(render_config)
# render_type = render_config.pop("render_type", None)
# if not render_type or render_type == "data":
# pass
# elif render_type == "metadata":
# pass
# elif render_type == "properties":
# pass
# elif render_type == "lineage":
# pass
result = render_operation.run(
kiara=self.context,
inputs={"value": _value, "render_config": render_config},
)
if use_pretty_print:
render_result: Value = result["rendered_value"]
value_render_data: RenderValueResult = render_result.data
else:
render_result = result["render_value_result"]
if render_result.data_type_name != "render_value_result":
raise Exception(
f"Invalid result type for render operation: {render_result.data_type_name}"
)
value_render_data = render_result.data # type: ignore
return value_render_data
# ------------------------------------------------------------------------------------------------------------------
# workflow-related methods
# all of the workflow-related methods are provisional experiments, so don't rely on them to be availale long term
def list_workflow_ids(self) -> List[uuid.UUID]:
"""List all available workflow ids.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
return list(self.context.workflow_registry.all_workflow_ids)
def list_workflow_alias_names(self) -> List[str]:
""" "List all available workflow aliases.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
return list(self.context.workflow_registry.workflow_aliases.keys())
def get_workflow(
self, workflow: Union[str, uuid.UUID], create_if_necessary: bool = True
) -> "Workflow":
"""Retrieve the workflow instance with the specified id or alias.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
no_such_alias: bool = False
workflow_id: Union[uuid.UUID, None] = None
workflow_alias: Union[str, None] = None
if isinstance(workflow, str):
try:
workflow_id = uuid.UUID(workflow)
except Exception:
workflow_alias = workflow
try:
workflow_id = self.context.workflow_registry.get_workflow_id(
workflow_alias=workflow
)
except NoSuchWorkflowException:
no_such_alias = True
else:
workflow_id = workflow
if workflow_id is None:
raise Exception(f"Can't retrieve workflow for: {workflow}")
if workflow_id in self._workflow_cache.keys():
return self._workflow_cache[workflow_id]
if workflow_id is None and not create_if_necessary:
if not no_such_alias:
msg = f"No workflow with id '{workflow}' registered."
else:
msg = f"No workflow with alias '{workflow}' registered."
raise NoSuchWorkflowException(workflow=workflow, msg=msg)
if workflow_id:
# workflow_metadata = self.context.workflow_registry.get_workflow_metadata(
# workflow=workflow_id
# )
workflow_obj = Workflow(kiara=self.context, workflow=workflow_id)
self._workflow_cache[workflow_obj.workflow_id] = workflow_obj
else:
# means we need to create it
workflow_obj = self.create_workflow(workflow_alias=workflow_alias)
return workflow_obj
def retrieve_workflow_info(
self, workflow: Union[str, uuid.UUID, "Workflow"]
) -> WorkflowInfo:
"""Retrieve information about the specified workflow.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
from kiara.interfaces.python_api.workflow import Workflow
if isinstance(workflow, Workflow):
_workflow: Workflow = workflow
else:
_workflow = self.get_workflow(workflow)
return WorkflowInfo.create_from_workflow(workflow=_workflow)
def list_workflows(self, **matcher_params) -> "WorkflowsMap":
"""List all available workflow sessions, indexed by their unique id."""
from kiara.interfaces.python_api.models.doc import WorkflowsMap
from kiara.interfaces.python_api.models.workflow import WorkflowMatcher
workflows = {}
matcher = WorkflowMatcher(**matcher_params)
if matcher.has_alias:
for (
alias,
workflow_id,
) in self.context.workflow_registry.workflow_aliases.items():
workflow = self.get_workflow(workflow=workflow_id)
workflows[workflow.workflow_id] = workflow
return WorkflowsMap(root={str(k): v for k, v in workflows.items()})
else:
for workflow_id in self.context.workflow_registry.all_workflow_ids:
workflow = self.get_workflow(workflow=workflow_id)
workflows[workflow_id] = workflow
return WorkflowsMap(root={str(k): v for k, v in workflows.items()})
def list_workflow_aliases(self, **matcher_params) -> "WorkflowsMap":
"""List all available workflow sessions that have an alias, indexed by alias.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
from kiara.interfaces.python_api.models.doc import WorkflowsMap
from kiara.interfaces.python_api.workflow import Workflow
if matcher_params:
matcher_params["has_alias"] = True
workflows = self.list_workflows(**matcher_params)
result: Dict[str, Workflow] = {}
for workflow in workflows.values():
aliases = self.context.workflow_registry.get_aliases(
workflow_id=workflow.workflow_id
)
for a in aliases:
if a in result.keys():
raise Exception(
f"Duplicate workflow alias '{a}': this is most likely a bug."
)
result[a] = workflow
result = {k: result[k] for k in sorted(result.keys())}
else:
# faster if not other matcher params
all_aliases = self.context.workflow_registry.workflow_aliases
result = {
a: self.get_workflow(workflow=all_aliases[a])
for a in sorted(all_aliases.keys())
}
return WorkflowsMap(root=result)
def retrieve_workflows_info(self, **matcher_params: Any) -> WorkflowGroupInfo:
"""Get a map info instances for all available workflows, indexed by (stringified) workflow-id.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
workflows = self.list_workflows(**matcher_params)
workflow_infos = WorkflowGroupInfo.create_from_workflows(
*workflows.values(),
group_title=None,
alias_map=self.context.workflow_registry.workflow_aliases,
)
return workflow_infos
def retrieve_workflow_aliases_info(
self, **matcher_params: Any
) -> WorkflowGroupInfo:
"""Get a map info instances for all available workflows, indexed by alias.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
workflows = self.list_workflow_aliases(**matcher_params)
workflow_infos = WorkflowGroupInfo.create_from_workflows(
*workflows.values(),
group_title=None,
alias_map=self.context.workflow_registry.workflow_aliases,
)
return workflow_infos
def create_workflow(
self,
workflow_alias: Union[None, str] = None,
initial_pipeline: Union[None, Path, str, Mapping[str, Any]] = None,
initial_inputs: Union[None, Mapping[str, Any]] = None,
documentation: Union[Any, None] = None,
save: bool = False,
force_alias: bool = False,
) -> "Workflow":
"""Create a workflow instance.
NOTE: this is a provisional endpoint, don't use in anger yet
"""
from kiara.interfaces.python_api.workflow import Workflow
if workflow_alias is not None:
try:
uuid.UUID(workflow_alias)
raise Exception(
f"Can't create workflow, provided alias can't be a uuid: {workflow_alias}."
)
except Exception:
pass
workflow_id = ID_REGISTRY.generate()
metadata = WorkflowMetadata(
workflow_id=workflow_id, documentation=documentation
)
workflow_obj = Workflow(kiara=self.context, workflow=metadata)
if workflow_alias:
workflow_obj._pending_aliases.add(workflow_alias)
if initial_pipeline:
operation = self.get_operation(operation=initial_pipeline)
if operation.module_type == "pipeline":
pipeline_details: PipelineOperationDetails = operation.operation_details # type: ignore
workflow_obj.add_steps(*pipeline_details.pipeline_config.steps)
input_aliases = pipeline_details.pipeline_config.input_aliases
for k, v in input_aliases.items():
workflow_obj.set_input_alias(input_field=k, alias=v)
output_aliases = pipeline_details.pipeline_config.output_aliases
for k, v in output_aliases.items():
workflow_obj.set_output_alias(output_field=k, alias=v)
else:
raise NotImplementedError()
workflow_obj.set_inputs(**operation.module.config.defaults)
if initial_inputs:
workflow_obj.set_inputs(**initial_inputs)
self._workflow_cache[workflow_obj.workflow_id] = workflow_obj
if save:
if force_alias and workflow_alias:
self.context.workflow_registry.unregister_alias(workflow_alias)
workflow_obj.save()
return workflow_obj
def _repr_html_(self):
info = self.get_context_info()
r = info.create_renderable()
mime_bundle = r._repr_mimebundle_(include=[], exclude=[]) # type: ignore
return mime_bundle["text/html"]