Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
import pyarrow as pa
from sarus_data_spec.manager.ops.base import (
DatasetImplementation,
DatasetStaticChecker,
)
import sarus_data_spec.manager.ops.source.sql as sql
import sarus_data_spec.typing as st
class SourceSQL(DatasetImplementation):
async def schema(self) -> st.Schema:
if self.dataset.protobuf().spec.HasField("sql"):
return await sql._sql_schema(dataset=self.dataset)
else:
raise ValueError("Schema type not implemented yet")
async def to_arrow(
self, batch_size: int
) -> t.AsyncIterator[pa.RecordBatch]:
schema = await self.dataset.manager().async_schema(
dataset=self.dataset
)
return await sql._sql_to_arrow(
dataset=self.dataset,
batch_size=batch_size,
sarus_schema=schema,
)
async def parent_schema(self) -> st.Schema:
raise ValueError("Not Implemented for source dataset")
async def parent_value(self) -> st.Schema:
raise ValueError("Not Implemented for source dataset")
async def parent_to_arrow(
self, batch_size: int = 10000
) -> t.AsyncIterator[pa.RecordBatch]:
raise ValueError("Not Implemented for source dataset")
class SourceSQLStaticChecker(DatasetStaticChecker):
async def schema(self) -> st.Schema:
if self.dataset.protobuf().spec.HasField("sql"):
return await sql._sql_schema(dataset=self.dataset)
else:
raise ValueError("Schema type not implemented yet")