Repository URL to install this package:
|
Version:
0.5.11.dev7+g32c33142e ▾
|
# -*- coding: utf-8 -*-
from kiara.api import KiaraAPI
from kiara.utils.cli import terminal_print
"""{{ pipeline.doc.full_doc }}
"""
kiara = KiaraAPI.instance()
# ==============================================================================
# Specify all the pipeline inputs here
{% for field_name, field_schema in pipeline.pipeline_inputs_schema.items() %}
# {{ field_schema.doc.description }}
pipeline_input_{{ field_name }} = {{ pipeline_inputs[field_name] }}
{% endfor %}
# ==============================================================================
{%- set pipeline_input_refs = pipeline.pipeline_input_refs -%}
{%- for idx, steps in pipeline.get_steps_by_stage().items() %}
# Processing stage: {{ idx }}
{%- for step_id, step in steps.items() -%}
{% set step_pipeline_inputs = step.find_pipeline_inputs(pipeline) %}
{% set step_pipeline_outputs = step.find_pipeline_outputs(pipeline) %}
# Processing step: {{ step.step_id }}
inputs_{{ step_id }} = {
{% for field_name, input_ref in step_pipeline_inputs.items() %} '{{ field_name }}': pipeline_input_{{ input_ref.value_name }},
{% endfor -%}
{% for field_name, input_ref in step.input_links.items() %} '{{ field_name }}': results_{{ input_ref[0].step_id }}['{{ input_ref[0].value_name }}'],
{% endfor -%}
}
{% if not step.manifest_src.module_config -%}
results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', inputs=inputs_{{ step_id }}, comment="")
{% else -%}
step_config_{{ step_id }} = {{ step.manifest_src.module_config }}
results_{{ step_id }} = kiara.run_job('{{ step.manifest_src.module_type }}', operation_config=step_config_{{ step_id }}, inputs=inputs_{{ step_id }}, comment="")
{% endif -%}
{% endfor -%}
{% endfor %}
# Print pipeline outputs
{%- set pipeline_output_refs = pipeline.pipeline_output_refs -%}
{% for field_name, output_ref in pipeline_output_refs.items() %}
pipeline_result_{{ field_name }} = results_{{ output_ref.connected_output.step_id }}['{{ output_ref.connected_output.value_name }}']
terminal_print(pipeline_result_{{ field_name }}, in_panel='Pipeline result metadata: [b]{{ field_name }}[/b]')
terminal_print(pipeline_result_{{ field_name }}.data, in_panel='Pipeline result data: [b]{{ field_name }}[/b]')
{% endfor %}