Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import logging
import time
import traceback
import typing as t
import pyarrow as pa
from sarus_data_spec import typing as st
from sarus_data_spec.constants import SQL_TASK
from sarus_data_spec.manager.base import Base
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.computations.local.to_sql import ToSQLComputation
from sarus_data_spec.status import DataSpecErrorStatus, error, ready
logger = logging.getLogger(__name__)
T = t.TypeVar("T")
class SQLComputation(LocalComputation[t.AsyncIterator[pa.RecordBatch]]):
"""Class responsible to SQL prepare a dataset. For SQL prepared datasets,
SQL queries can be submited without waitinig.
"""
task_name = SQL_TASK
def __init__(
self,
computing_manager: Base,
to_sql_computation: ToSQLComputation,
) -> None:
super().__init__(computing_manager)
self.to_sql_computation = to_sql_computation
async def prepare(self, dataspec: st.DataSpec) -> None:
try:
logger.info(f"STARTED SQL {dataspec.uuid()}")
start = time.perf_counter()
dataset = t.cast(st.Dataset, dataspec)
if self.computing_manager().is_cached_to_sql(dataset):
await self.to_sql_computation.task_result(dataset)
else:
await self.computing_manager().async_sql_prepare_parents(
dataset
)
except DataSpecErrorStatus as exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(exception.relaunch),
},
)
raise
except Exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(False),
},
)
raise DataSpecErrorStatus((False, traceback.format_exc()))
else:
end = time.perf_counter()
logger.info(f"FINISHED SQL {dataspec.uuid()} ({end-start:.2f}s)")
ready(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
)
async def task_result(
self, dataspec: st.DataSpec, **kwargs: t.Any
) -> t.AsyncIterator[pa.RecordBatch]:
"""Overriding task_result to avoid changing the computation status if
an SQL query fails. Here if self.read_ready_result fail we don't add
a status with error but we simple raise an exception. The relaunch
is also set to False to avoid the task to be relaunched when havnig
failing queries.
"""
status = await self.complete_task(dataspec=dataspec)
stage = status.task(self.task_name)
assert stage
assert stage.ready()
try:
return await self.result_from_stage_properties(
dataspec, stage.properties(), **kwargs
)
except DataSpecErrorStatus as e:
e.relaunch = False
raise
except Exception:
raise DataSpecErrorStatus((False, traceback.format_exc()))
async def result_from_stage_properties(
self,
dataspec: st.DataSpec,
properties: t.Mapping[str, str],
**kwargs: t.Any,
) -> t.AsyncIterator[pa.RecordBatch]:
"""Execute the query synchronously on an SQL prepared dataset."""
dataset = t.cast(st.Dataset, dataspec)
query = kwargs["query"]
dialect = kwargs["dialect"]
batch_size = kwargs["batch_size"]
result_type = kwargs["result_type"]
if self.computing_manager().is_cached_to_sql(dataset):
status = self.to_sql_computation.status(dataspec)
assert status
stage = status.task(self.to_sql_computation.task_name)
assert stage
assert stage.ready()
caching_properties = (
await self.to_sql_computation.result_from_stage_properties(
dataspec,
stage.properties(),
)
)
return await self.computing_manager().execute_sql_query(
dataset,
caching_properties=caching_properties,
query=query,
dialect=dialect,
result_type=result_type,
batch_size=batch_size,
)
return await self.computing_manager().async_sql_op(
dataset=dataset,
query=query,
dialect=dialect,
batch_size=batch_size,
result_type=result_type,
)