Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import json
import time
import traceback
import typing as t
import warnings
from sarus_data_spec import typing as st
from sarus_data_spec.constants import (
EXTENDED_TABLE_MAPPING,
SQL_CACHING_URI,
TABLE_MAPPING,
PUSH_SQL_TASK,
)
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.computations.local.parquet import logger
try:
from sarus_data_spec.manager.ops.source.to_sql import async_push_sql_op
except ModuleNotFoundError:
warnings.warn("Source push_sql ops not available.")
from sarus_data_spec.status import DataSpecErrorStatus, error, ready
class PushSQLComputation(LocalComputation[t.Mapping[str, str]]):
"""Class responsible for exporting a dataspec into a
SQL destination (may be outside Sarus)"""
task_name = PUSH_SQL_TASK
async def prepare(self, dataspec: st.DataSpec) -> None:
logger.info(f"STARTING push_sql {dataspec.uuid()}")
start = time.perf_counter()
try:
dataset = t.cast(st.Dataset, dataspec)
pushed_uri, encoded_map, expanded_map = await async_push_sql_op(
dataset
)
except DataSpecErrorStatus as exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(exception.relaunch),
},
)
raise DataSpecErrorStatus(
(exception.relaunch, traceback.format_exc())
)
except Exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(False),
},
)
raise DataSpecErrorStatus((False, traceback.format_exc()))
else:
end = time.perf_counter()
logger.info(
f"FINISHED push_sql {dataspec.uuid()} ({end-start:.2f}s)"
)
ready(
dataspec=dataspec,
manager=self.computing_manager(),
task=PUSH_SQL_TASK,
properties={
SQL_CACHING_URI: pushed_uri,
TABLE_MAPPING: json.dumps(encoded_map),
EXTENDED_TABLE_MAPPING: json.dumps(expanded_map),
},
)
async def result_from_stage_properties(
self,
dataspec: st.DataSpec,
properties: t.Mapping[str, str],
**kwargs: t.Any,
) -> t.Mapping[str, str]:
"""For the moment it returns None but it can
return maybe the ready status.
"""
return properties