Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

import pyarrow as pa

from sarus_data_spec.manager.ops.base import (
    DatasetImplementation,
    DatasetStaticChecker,
)
import sarus_data_spec.manager.ops.source.sql as sql
import sarus_data_spec.typing as st


class SourceSQL(DatasetImplementation):
    async def schema(self) -> st.Schema:
        if self.dataset.protobuf().spec.HasField("sql"):
            return await sql._sql_schema(dataset=self.dataset)
        else:
            raise ValueError("Schema type not implemented yet")

    async def to_arrow(
        self, batch_size: int
    ) -> t.AsyncIterator[pa.RecordBatch]:
        schema = await self.dataset.manager().async_schema(
            dataset=self.dataset
        )
        return await sql._sql_to_arrow(
            dataset=self.dataset,
            batch_size=batch_size,
            sarus_schema=schema,
        )

    async def parent_schema(self) -> st.Schema:
        raise ValueError("Not Implemented for source dataset")

    async def parent_value(self) -> st.Schema:
        raise ValueError("Not Implemented for source dataset")

    async def parent_to_arrow(
        self, batch_size: int = 10000
    ) -> t.AsyncIterator[pa.RecordBatch]:
        raise ValueError("Not Implemented for source dataset")


class SourceSQLStaticChecker(DatasetStaticChecker):
    async def schema(self) -> st.Schema:
        if self.dataset.protobuf().spec.HasField("sql"):
            return await sql._sql_schema(dataset=self.dataset)
        else:
            raise ValueError("Schema type not implemented yet")