Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
from pyarrow import csv
import pyarrow as pa
try:
import clevercsv
except ModuleNotFoundError:
pass
from sarus_data_spec.arrow.type import type_from_arrow
from sarus_data_spec.constants import DATASET_SLUGNAME
from sarus_data_spec.manager.ops.source.csv.inference import infer_dates
from sarus_data_spec.schema import Schema
from sarus_data_spec.schema import schema as schema_builder
from sarus_data_spec.type import Struct
import sarus_data_spec.protobuf as sp
import sarus_data_spec.typing as st
async def csv_schema(dataset: st.Dataset) -> Schema:
sources_ds = dataset.sources(sp.type_name(sp.Dataset))
admin_sources = [source for source in sources_ds if not source.is_public()]
assert len(admin_sources) == 1
uri = admin_sources[0].protobuf().spec.file.uri
source_fs = dataset.manager().filesystem(uri) # type:ignore
table, parse_options = load_csv_table(source_fs, uri)
new_fields = [
pa.field(name, orig_type)
if orig_type != pa.string()
else pa.field(name, pa.large_string())
for name, orig_type in zip(table.schema.names, table.schema.types)
]
table = table.cast(pa.schema(new_fields))
column_names = convert_names(table.schema.names)
table = table.rename_columns(column_names)
csv_fields = {
col: type_from_arrow(
arrow_type=table.field(col).type,
nullable=table.field(col).nullable,
)
for col in table.schema.names
}
schema_type = Struct(fields=csv_fields)
# date_inference
updated_type = infer_dates(
_type=schema_type,
arrow_array=pa.StructArray.from_arrays(
arrays=[el.combine_chunks() for el in table.columns],
names=table.column_names,
),
)
sarus_schema = schema_builder(
dataset=dataset,
schema_type=updated_type,
properties={"delimiter": parse_options.delimiter},
name=dataset.properties().get(DATASET_SLUGNAME, None),
)
return sarus_schema
def load_csv_table(
source_fs: t.Any, uri: str
) -> t.Tuple[pa.lib.Table, pa._csv.ParseOptions]:
# load csv table from a given file system by it's path
# returns a table and parse options for csv reader
with source_fs.open(uri) as f:
sample = "".join([f.readline().decode("utf-8") for _ in range(100)])
read_options, parse_options = csv_params_from_clevercsv(sample=sample)
convert_options = csv.ConvertOptions(
strings_can_be_null=True,
timestamp_parsers=[csv.ISO8601, "%Y-%m-%d"],
true_values=["True"],
false_values=["False"],
) # we could provide a more exhaustive list of dates and it would
# work.
# The problem is that then it is not possible to know directly
# which format was matched
# in a straight-forward way. So for less conventional
# dates inference is kept in the user input transform.
table = csv.read_csv(
pa.py_buffer(source_fs.open(uri).read()),
read_options=read_options,
parse_options=parse_options,
convert_options=convert_options,
)
return table, parse_options
def csv_size_non_dp(source_fs: t.Any, uri: str) -> int:
# give a non DP size in lines for a given filesystem and path
table, _ = load_csv_table(source_fs, uri)
return len(table)
def csv_params_from_clevercsv(
sample: str,
) -> t.Tuple[csv.ReadOptions, csv.ParseOptions]:
"""Build pyarrow csv.ReadOption from clevercsv sniffing"""
sniffer = clevercsv.Sniffer()
dialect = sniffer.sniff(sample)
assert dialect
has_header = sniffer.has_header(sample)
read_options = csv.ReadOptions(autogenerate_column_names=not has_header)
parse_options = csv.ParseOptions(delimiter=dialect.delimiter)
return read_options, parse_options
def convert_names(columns: t.List[str]) -> t.List[str]:
columns_dict: t.Dict[str, int] = {}
for idx, c in enumerate(columns):
if not c: # no column name
columns[idx] = generate_original_column_name(
idx, columns, "column"
)
elif c in columns_dict: # duplicate column name
columns_dict[c] += 1
columns[idx] = generate_original_column_name(
columns_dict[c], columns, c
)
else:
columns_dict[c] = 0
return columns
def generate_original_column_name(
idx: int, columns: t.List[str], base_name: str = "column"
) -> str:
"""A small recursive function to create a new column name that does not
exist yet"""
candidate = f"{base_name}_{idx}"
if candidate in columns:
return generate_original_column_name(idx + 1, columns, base_name)
else:
return candidate