Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import logging
import time
import traceback
import typing as t

import pyarrow as pa

from sarus_data_spec import typing as st
from sarus_data_spec.constants import SQL_TASK
from sarus_data_spec.manager.base import Base
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.computations.local.to_sql import ToSQLComputation
from sarus_data_spec.status import DataSpecErrorStatus, error, ready

logger = logging.getLogger(__name__)

T = t.TypeVar("T")


class SQLComputation(LocalComputation[t.AsyncIterator[pa.RecordBatch]]):
    """Class responsible to SQL prepare a dataset. For SQL prepared datasets,
    SQL queries can be submited without waitinig.
    """

    task_name = SQL_TASK

    def __init__(
        self,
        computing_manager: Base,
        to_sql_computation: ToSQLComputation,
    ) -> None:
        super().__init__(computing_manager)
        self.to_sql_computation = to_sql_computation

    async def prepare(self, dataspec: st.DataSpec) -> None:
        try:
            logger.info(f"STARTED SQL {dataspec.uuid()}")
            start = time.perf_counter()
            dataset = t.cast(st.Dataset, dataspec)
            if self.computing_manager().is_cached_to_sql(dataset):
                await self.to_sql_computation.task_result(dataset)
            else:
                await self.computing_manager().async_sql_prepare_parents(
                    dataset
                )

        except DataSpecErrorStatus as exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(exception.relaunch),
                },
            )
            raise
        except Exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(False),
                },
            )
            raise DataSpecErrorStatus((False, traceback.format_exc()))
        else:
            end = time.perf_counter()
            logger.info(f"FINISHED SQL {dataspec.uuid()} ({end-start:.2f}s)")
            ready(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
            )

    async def task_result(
        self, dataspec: st.DataSpec, **kwargs: t.Any
    ) -> t.AsyncIterator[pa.RecordBatch]:
        """Overriding task_result to avoid changing the computation status if
        an SQL query fails. Here if self.read_ready_result fail we don't add
        a status with error but we simple raise an exception. The relaunch
        is also set to False to avoid the task to be relaunched when havnig
        failing queries.
        """
        status = await self.complete_task(dataspec=dataspec)
        stage = status.task(self.task_name)
        assert stage
        assert stage.ready()
        try:
            return await self.result_from_stage_properties(
                dataspec, stage.properties(), **kwargs
            )
        except DataSpecErrorStatus as e:
            e.relaunch = False
            raise
        except Exception:
            raise DataSpecErrorStatus((False, traceback.format_exc()))

    async def result_from_stage_properties(
        self,
        dataspec: st.DataSpec,
        properties: t.Mapping[str, str],
        **kwargs: t.Any,
    ) -> t.AsyncIterator[pa.RecordBatch]:
        """Execute the query synchronously on an SQL prepared dataset."""

        dataset = t.cast(st.Dataset, dataspec)
        query = kwargs["query"]
        dialect = kwargs["dialect"]
        batch_size = kwargs["batch_size"]
        result_type = kwargs["result_type"]

        if self.computing_manager().is_cached_to_sql(dataset):
            status = self.to_sql_computation.status(dataspec)
            assert status
            stage = status.task(self.to_sql_computation.task_name)
            assert stage
            assert stage.ready()
            caching_properties = (
                await self.to_sql_computation.result_from_stage_properties(
                    dataspec,
                    stage.properties(),
                )
            )
            return await self.computing_manager().execute_sql_query(
                dataset,
                caching_properties=caching_properties,
                query=query,
                dialect=dialect,
                result_type=result_type,
                batch_size=batch_size,
            )
        return await self.computing_manager().async_sql_op(
            dataset=dataset,
            query=query,
            dialect=dialect,
            batch_size=batch_size,
            result_type=result_type,
        )