Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from pyarrow import csv
import pyarrow as pa

try:
    import clevercsv
except ModuleNotFoundError:
    pass
from sarus_data_spec.arrow.type import type_from_arrow
from sarus_data_spec.constants import DATASET_SLUGNAME
from sarus_data_spec.manager.ops.source.csv.inference import infer_dates
from sarus_data_spec.schema import Schema
from sarus_data_spec.schema import schema as schema_builder
from sarus_data_spec.type import Struct
import sarus_data_spec.protobuf as sp
import sarus_data_spec.typing as st


async def csv_schema(dataset: st.Dataset) -> Schema:
    sources_ds = dataset.sources(sp.type_name(sp.Dataset))
    admin_sources = [source for source in sources_ds if not source.is_public()]
    assert len(admin_sources) == 1
    uri = admin_sources[0].protobuf().spec.file.uri
    source_fs = dataset.manager().filesystem(uri)  # type:ignore
    table, parse_options = load_csv_table(source_fs, uri)

    new_fields = [
        pa.field(name, orig_type)
        if orig_type != pa.string()
        else pa.field(name, pa.large_string())
        for name, orig_type in zip(table.schema.names, table.schema.types)
    ]
    table = table.cast(pa.schema(new_fields))
    column_names = convert_names(table.schema.names)
    table = table.rename_columns(column_names)
    csv_fields = {
        col: type_from_arrow(
            arrow_type=table.field(col).type,
            nullable=table.field(col).nullable,
        )
        for col in table.schema.names
    }
    schema_type = Struct(fields=csv_fields)

    # date_inference
    updated_type = infer_dates(
        _type=schema_type,
        arrow_array=pa.StructArray.from_arrays(
            arrays=[el.combine_chunks() for el in table.columns],
            names=table.column_names,
        ),
    )

    sarus_schema = schema_builder(
        dataset=dataset,
        schema_type=updated_type,
        properties={"delimiter": parse_options.delimiter},
        name=dataset.properties().get(DATASET_SLUGNAME, None),
    )
    return sarus_schema


def load_csv_table(
    source_fs: t.Any, uri: str
) -> t.Tuple[pa.lib.Table, pa._csv.ParseOptions]:
    # load csv table from a given file system by it's path
    # returns a table and parse options for csv reader
    with source_fs.open(uri) as f:
        sample = "".join([f.readline().decode("utf-8") for _ in range(100)])
    read_options, parse_options = csv_params_from_clevercsv(sample=sample)
    convert_options = csv.ConvertOptions(
        strings_can_be_null=True,
        timestamp_parsers=[csv.ISO8601, "%Y-%m-%d"],
        true_values=["True"],
        false_values=["False"],
    )  # we could provide a more exhaustive list of dates and it would
    # work.
    # The problem is that then it is not possible to know directly
    # which format was matched
    # in a straight-forward way. So for less conventional
    # dates inference is kept in the user input transform.

    table = csv.read_csv(
        pa.py_buffer(source_fs.open(uri).read()),
        read_options=read_options,
        parse_options=parse_options,
        convert_options=convert_options,
    )
    return table, parse_options


def csv_size_non_dp(source_fs: t.Any, uri: str) -> int:
    # give a non DP size in lines for a given filesystem and path
    table, _ = load_csv_table(source_fs, uri)
    return len(table)


def csv_params_from_clevercsv(
    sample: str,
) -> t.Tuple[csv.ReadOptions, csv.ParseOptions]:
    """Build pyarrow csv.ReadOption from clevercsv sniffing"""

    sniffer = clevercsv.Sniffer()
    dialect = sniffer.sniff(sample)
    assert dialect
    has_header = sniffer.has_header(sample)

    read_options = csv.ReadOptions(autogenerate_column_names=not has_header)
    parse_options = csv.ParseOptions(delimiter=dialect.delimiter)
    return read_options, parse_options


def convert_names(columns: t.List[str]) -> t.List[str]:
    columns_dict: t.Dict[str, int] = {}
    for idx, c in enumerate(columns):
        if not c:  # no column name
            columns[idx] = generate_original_column_name(
                idx, columns, "column"
            )
        elif c in columns_dict:  # duplicate column name
            columns_dict[c] += 1
            columns[idx] = generate_original_column_name(
                columns_dict[c], columns, c
            )
        else:
            columns_dict[c] = 0
    return columns


def generate_original_column_name(
    idx: int, columns: t.List[str], base_name: str = "column"
) -> str:
    """A small recursive function to create a new column name that does not
    exist yet"""
    candidate = f"{base_name}_{idx}"
    if candidate in columns:
        return generate_original_column_name(idx + 1, columns, base_name)
    else:
        return candidate