Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import json
import time
import traceback
import typing as t
import warnings

from sarus_data_spec import typing as st
from sarus_data_spec.constants import (
    EXTENDED_TABLE_MAPPING,
    SQL_CACHING_URI,
    TABLE_MAPPING,
    PUSH_SQL_TASK,
)
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.computations.local.parquet import logger

try:
    from sarus_data_spec.manager.ops.source.to_sql import async_push_sql_op
except ModuleNotFoundError:
    warnings.warn("Source push_sql ops not available.")
from sarus_data_spec.status import DataSpecErrorStatus, error, ready


class PushSQLComputation(LocalComputation[t.Mapping[str, str]]):
    """Class responsible for exporting a dataspec into a
    SQL destination (may be outside Sarus)"""

    task_name = PUSH_SQL_TASK

    async def prepare(self, dataspec: st.DataSpec) -> None:
        logger.info(f"STARTING push_sql {dataspec.uuid()}")
        start = time.perf_counter()
        try:
            dataset = t.cast(st.Dataset, dataspec)
            pushed_uri, encoded_map, expanded_map = await async_push_sql_op(
                dataset
            )
        except DataSpecErrorStatus as exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(exception.relaunch),
                },
            )
            raise DataSpecErrorStatus(
                (exception.relaunch, traceback.format_exc())
            )

        except Exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(False),
                },
            )
            raise DataSpecErrorStatus((False, traceback.format_exc()))

        else:
            end = time.perf_counter()
            logger.info(
                f"FINISHED push_sql {dataspec.uuid()} ({end-start:.2f}s)"
            )
            ready(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=PUSH_SQL_TASK,
                properties={
                    SQL_CACHING_URI: pushed_uri,
                    TABLE_MAPPING: json.dumps(encoded_map),
                    EXTENDED_TABLE_MAPPING: json.dumps(expanded_map),
                },
            )

    async def result_from_stage_properties(
        self,
        dataspec: st.DataSpec,
        properties: t.Mapping[str, str],
        **kwargs: t.Any,
    ) -> t.Mapping[str, str]:
        """For the moment it returns None but it can
        return maybe the ready status.
        """
        return properties