Repository URL to install this package:
|
Version:
0.4.29 ▾
|
# -*- coding: utf-8 -*-
import functools
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Mapping, Tuple, Type, Union
from pydantic import Field
from kiara.exceptions import KiaraException, KiaraProcessingException
from kiara.models.filesystem import KiaraFileBundle
from kiara.models.module import KiaraModuleConfig
from kiara.models.module.jobs import JobLog
from kiara.models.values.value import SerializedData, Value, ValueMap
from kiara.modules import KiaraModule, ValueMapSchema
from kiara.modules.included_core_modules.create_from import (
CreateFromModule,
CreateFromModuleConfig,
)
from kiara.modules.included_core_modules.export_as import DataExportModule
from kiara.modules.included_core_modules.serialization import DeserializeValueModule
from kiara.utils import find_free_id, log_message
from kiara_plugin.tabular.defaults import TABLE_COLUMN_SPLIT_MARKER
from kiara_plugin.tabular.models.tables import KiaraTables
from kiara_plugin.tabular.utils import create_table_from_file_bundle
class DeserializeTableModule(DeserializeValueModule):
_module_type_name = "load.tables"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraTables}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "tables"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
def to__python_object(self, data: SerializedData, **config: Any):
import pyarrow as pa
tables: Dict[str, Any] = {}
for column_id in data.get_keys():
if TABLE_COLUMN_SPLIT_MARKER not in column_id:
raise KiaraException(
f"Invalid serialized 'tables' data, key must contain '{TABLE_COLUMN_SPLIT_MARKER}': {column_id}"
)
table_id, column_name = column_id.split(
TABLE_COLUMN_SPLIT_MARKER, maxsplit=1
)
chunks = data.get_serialized_data(column_id)
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as column_chunk:
loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
column = loaded_arrays.column(column_name)
tables.setdefault(table_id, {})[column_name] = column
table = KiaraTables.create_tables(tables)
return table
class CreateTablesModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
# merge_into_single_table: bool = Field(
# description="Whether to merge all csv files into a single table.", default=False
# )
include_source_metadata: Union[bool, None] = Field(
description="Whether to include a table with metadata about the source files.",
default=None,
)
include_source_file_content: bool = Field(
description="When including source metadata, whether to also include the original raw (string) content.",
default=False,
)
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.tables"
_config_cls = CreateTablesModuleConfig
def create__tables__from__file_bundle(
self, source_value: Value, job_log: JobLog
) -> Any:
"""Create a database from a file_bundle value.
Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.
Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
from pyarrow import csv as pa_csv
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
tables = {}
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = []
included_files: Dict[str, bool] = {}
errors: Dict[str, Union[None, str]] = {}
for rel_path in sorted(bundle.included_files.keys()):
if not rel_path.endswith(".csv"):
job_log.add_log(
f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO
)
included_files[rel_path] = False
errors[rel_path] = "Not a csv file."
continue
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
table = pa_csv.read_csv(file_item.path)
tables[table_name] = table
included_files[rel_path] = True
except Exception as e:
included_files[rel_path] = False
errors[rel_path] = KiaraException.get_root_details(e)
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
if "file_items" in tables:
raise KiaraProcessingException(
"Can't create table: 'file_items' columns already exists."
)
table = create_table_from_file_bundle(
file_bundle=source_value.data,
include_content=include_content,
included_files=included_files,
errors=errors,
)
tables["file_items"] = table
return tables
class AssembleTablesConfig(KiaraModuleConfig):
"""Configuration for the 'assemble.tables' module."""
number_of_tables: Union[int, None] = Field(
description="How many tables should be merged. If 'table_names' is empty, this defaults to '2', otherwise the length of the 'table_names' input.",
default=None,
)
table_names: Union[List[str], None] = Field(
description="A pre-defined list of table names. If not defined, users will be asked for the table name(s).",
default=None,
)
class AssembleTablesModule(KiaraModule):
"""Assemble a 'tables' value from multiple tables.
Depending on the module configuration, 2 or more tables can be merged into a single 'tables' value.
"""
_module_type_name = "assemble.tables"
_config_cls = AssembleTablesConfig
@functools.cached_property
def _table_details(self) -> Tuple[int, Union[List[str], None]]:
number_tables: Union[int, None] = self.get_config_value("number_of_tables")
table_names: Union[None, List[str]] = self.get_config_value("table_names")
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length of 'table_names' config option must match."
)
if number_tables < 2:
raise KiaraException("The 'number_of_tables' must be at least 2.")
return number_tables, table_names
@property
def number_of_tables(self) -> int:
number_tables, _ = self._table_details
return number_tables
@property
def table_names(self) -> Union[List[str], None]:
_, table_names = self._table_details
return table_names
def create_inputs_schema(
self,
) -> ValueMapSchema:
number_tables = self.number_of_tables
table_names = self.table_names
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length of 'table_names' config option must match."
)
if number_tables < 2:
raise KiaraException("The 'number_of_tables' must be at least 2.")
inputs_schema = {}
if not table_names:
for i in range(1, number_tables + 1):
inputs_schema[f"table_name_{i}"] = {
"type": "string",
"doc": f"The alias for table #{i}.",
}
inputs_schema[f"table_{i}"] = {
"type": "table",
"doc": f"The table to merge (#{i}).",
}
else:
for table_name in table_names:
inputs_schema[f"table_{table_name}"] = {
"type": "table",
"doc": f"The table to merge for alias '{table_name}'.",
}
return inputs_schema
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"tables": {
"type": "tables",
"doc": "The assembled tables instance.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
number_tables = self.number_of_tables
table_names = self.table_names
tables: Dict[str, Any] = {}
if not table_names:
for i in range(1, number_tables + 1):
table_name = inputs.get_value_data(f"table_name_{i}")
table = inputs.get_value_obj(f"table_{i}")
if table_name in tables.keys():
raise KiaraException(f"Duplicate table name: '{table_name}'")
tables[table_name] = table
else:
for table_name in table_names:
table = inputs.get_value_obj(f"table_{table_name}")
tables[table_name] = table
outputs.set_value("tables", tables)
class ExportNetworkDataModule(DataExportModule):
"""Export network data items."""
_module_type_name = "export.tables"
# def export__network_data__as__graphml_file(
# self, value: NetworkData, base_path: str, name: str
# ):
# """Export network data as graphml file."""
#
# import networkx as nx
#
# target_path = os.path.join(base_path, f"{name}.graphml")
#
# # TODO: can't just assume digraph
# graph: nx.Graph = value.as_networkx_graph(nx.DiGraph)
# nx.write_graphml(graph, target_path)
#
# return {"files": target_path}
#
def export__tables__as__sqlite_db(
self, value: KiaraTables, base_path: str, name: str
):
"""Export network data as a sqlite database file."""
from kiara_plugin.tabular.utils.tables import create_database_from_tables
db = create_database_from_tables(tables=value)
target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
shutil.move(db.db_file_path, target_path)
return {"files": target_path}
def export__tables__as__sql_dump(
self, value: KiaraTables, base_path: str, name: str
):
"""Export network data as a sql dump file."""
import sqlite_utils
from kiara_plugin.tabular.utils.tables import create_database_from_tables
kiara_db = create_database_from_tables(tables=value)
db = sqlite_utils.Database(kiara_db.db_file_path)
target_path = Path(os.path.join(base_path, f"{name}.sql"))
with target_path.open("wt") as f:
for line in db.conn.iterdump():
f.write(line + "\n")
return {"files": target_path.as_posix()}
def export__tables__as__csv_files(
self, value: KiaraTables, base_path: str, name: str
):
"""Export network data as 2 csv files (one for edges, one for nodes."""
from pyarrow import csv
files = []
for table_name in value.table_names:
target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
os.makedirs(os.path.dirname(target_path), exist_ok=True)
table = value.get_table(table_name)
csv.write_csv(table.arrow_table, target_path)
files.append(target_path)
return {"files": files}