Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import datetime
import re
import typing as t

from dateutil.parser import parse
import numpy as np
import pyarrow as pa

import sarus_data_spec.type as sdt
import sarus_data_spec.typing as st


class DateInferenceException(Exception):
    pass


def infer_dates(
    _type: st.Type,
    arrow_array: pa.array,
) -> st.Type:
    """Visitor for type inference, specifically it checks that:
    - text types are actual text: date and datetimes
        inference is performed
    """

    class TypeConverter(st.TypeVisitor):
        new_type: st.Type = _type

        def Text(
            self,
            encoding: str,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            max_length = pa.compute.max(
                pa.compute.utf8_length(arrow_array)
            ).as_py()
            if max_length < 30:  # max length for a datetime formatted as text
                try:
                    # TODO: infer_date_format can't find time only format
                    # we need to modify it to be able to
                    # guess as well time format
                    date_format = infer_date_format(
                        arrow_array.to_numpy(zero_copy_only=False),
                    )
                    if any(
                        [
                            time_format in date_format
                            for time_format in ["M", "H", "S", "f"]
                        ]
                    ):
                        self.new_type = sdt.Datetime(
                            format=date_format,
                            possible_values=possible_values,
                            properties=properties,
                        )
                    else:
                        self.new_type = sdt.Date(
                            format=date_format,
                            possible_values=possible_values,
                            properties=properties,
                        )
                except Exception:
                    pass

        def Float(
            self,
            min: float,
            max: float,
            base: st.FloatBase,
            possible_values: t.Iterable[float],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Struct(
            self,
            fields: t.Mapping[str, st.Type],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            new_fields = {}
            old_arrs = arrow_array.flatten()
            for field_name, field_type in fields.items():
                idx = arrow_array.type.get_field_index(field_name)
                new_fields[field_name] = infer_dates(field_type, old_arrs[idx])
            self.new_type = sdt.Struct(
                fields=new_fields,
                name=name if name is not None else "",
                properties=properties,
            )

        def Union(
            self,
            fields: t.Mapping[str, st.Type],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            new_fields = {}
            old_arrs = arrow_array.flatten()
            for field_name, field_type in fields.items():
                idx = arrow_array.type.get_field_index(field_name)
                new_fields[field_name] = infer_dates(
                    field_type, old_arrs[idx].filter(old_arrs[idx].is_valid())
                )
            self.new_type = sdt.Union(
                fields=new_fields,
                name=name if name is not None else "",
                properties=properties,
            )

        def Optional(
            self,
            type: st.Type,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            self.new_type = sdt.Optional(
                type=infer_dates(
                    type,
                    arrow_array.filter(
                        pa.compute.invert(
                            arrow_array.is_null(nan_is_null=True)
                        )
                    ),
                ),
                name=name if name is not None else "",
                properties=properties,
            )

        def Datetime(
            self,
            format: str,
            min: str,
            max: str,
            base: st.DatetimeBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Date(
            self,
            format: str,
            min: str,
            max: str,
            base: st.DateBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Time(
            self,
            format: str,
            min: str,
            max: str,
            base: st.TimeBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Duration(
            self,
            unit: str,
            min: int,
            max: int,
            possible_values: t.Iterable[int],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Array(
            self,
            type: st.Type,
            shape: t.Tuple[int, ...],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Boolean(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            pass

        def Unit(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            pass

        def Bytes(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            pass

        def Constrained(
            self,
            type: st.Type,
            constraint: st.Predicate,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Null(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            pass

        def Enum(
            self,
            name: str,
            name_values: t.Sequence[t.Tuple[str, int]],
            ordered: bool,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Hypothesis(
            self,
            *types: t.Tuple[st.Type, float],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Id(
            self,
            unique: bool,
            reference: t.Optional[st.Path] = None,
            base: t.Optional[st.IdBase] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def Integer(
            self,
            min: int,
            max: int,
            base: st.IntegerBase,
            possible_values: t.Iterable[int],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

        def List(
            self,
            type: st.Type,
            max_size: int,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            pass

    visitor = TypeConverter()
    _type.accept(visitor)
    return visitor.new_type


def infer_date_format(values: t.List[str]) -> str:
    """Infer if the list of values are dates.

    Parameters
    ----------
    values: List[str]
        A list of values from which to infer the type.

    Returns
    -------
        A dict representing a date type if a format is found, None otherwise.

    Raises
    ------
    Exception
        If no possible format has been found. That is the behavior
        expected by the infer_dates visitor for Text columns.
    """
    day_first = infere_day_first(values)

    formats = []
    for value in values:
        guess_format = guess_datetime_format(value, day_first)

        if guess_format is None:
            raise DateInferenceException

        formats.append(guess_format)

    unique_formats: t.List[str] = np.unique(formats).tolist()
    for format in unique_formats:
        try:
            out = list(
                map(
                    lambda x: datetime.datetime.strptime(x, format).strftime(
                        format
                    ),
                    values,
                )
            )
        except ValueError:
            pass
        else:
            if np.array_equal(out, values):
                return format
    raise DateInferenceException


# Distributed under the Apache 2 licence
# https://github.com/dateutil/dateutil/blob/master/LICENSE
#
# The following class is copied from
# https://github.com/dateutil/dateutil/pull/732
#
# We use this class to parse and tokenize date strings. However, as it is
# a private class in the dateutil library, relying on backwards compatibility
# is not practical. In fact, using this class issues warnings (xref gh-21322).
# Thus, we port the class over so that both issues are resolved.
#
# Copyright (c) 2017 - dateutil contributors
class _timelex:
    def __init__(self, instream: t.Any) -> None:
        if getattr(instream, "decode", None) is not None:  # pragma: no cover
            instream = instream.decode()

        if isinstance(instream, str):
            self.stream = instream
        elif getattr(instream, "read", None) is None:  # pragma: no cover
            raise TypeError(
                "Parser must be a string or character stream, not "
                f"{type(instream).__name__}"
            )
        else:  # pragma: no cover
            self.stream = instream.read()

    def get_tokens(self) -> t.Any:
        """
        This function breaks the time string into lexical units (tokens), which
        can be parsed by the parser. Lexical units are demarcated by changes in
        the character set, so any continuous string of letters is considered
        one unit, any continuous string of numbers is considered one unit.
        The main complication arises from the fact that dots ('.') can be used
        both as separators (e.g. "Sep.20.2009") or decimal points (e.g.
        "4:30:21.447"). As such, it is necessary to read the full context of
        any dot-separated strings before breaking it into tokens; as such, this
        function maintains a "token stack", for when the ambiguous context
        demands that multiple tokens be parsed at once.
        """
        # cdef:
        #     Py_ssize_t n

        stream = self.stream.replace("\x00", "")

        # TODO: Change \s --> \s+ (this doesn't match existing behavior)
        # TODO: change the punctuation block to punc+ (does not match existing)
        # TODO: can we merge the two digit patterns?
        tokens = re.findall(
            r"\s|"
            r"(?<![\.\d])\d+\.\d+(?![\.\d])"
            r"|\d+"
            r"|[a-zA-Z]+"
            r"|[\./:]+"
            r"|[^\da-zA-Z\./:\s]+",
            stream,
        )

        # Re-combine token tuples of the form ["59", ",", "456"] because
        # in this context the "," is treated as a decimal
        # (e.g. in python's default logging format)
        for n, token in enumerate(tokens[:-2]):
            # Kludge to match ,-decimal behavior; it'd be better to do this
            # later in the process and have a simpler tokenization
            if (
                token is not None
                and token.isdigit()
                and tokens[n + 1] == ","
                and tokens[n + 2].isdigit()
            ):  # pragma: no cover
                # Have to check None b/c it might be replaced during the loop
                # TODO: I _really_ don't faking the value here
                tokens[n] = token + "." + tokens[n + 2]
                tokens[n + 1] = None
                tokens[n + 2] = None

        tokens = [x for x in tokens if x is not None]
        return tokens

    @classmethod
    def split(cls, s: t.Any) -> t.Any:
        return cls(s).get_tokens()


# There is the pandas implementation here that we could adapt if
# needed, especially, we could have problems with the locale
# https://github.com/pandas-dev/pandas/blob/097ff0c34179ab55d08a6ab0757762beea4d8785/pandas/_libs/tslibs/parsing.pyx#L819,
# pandas limitation is that it is based on the first non-NaN element.
def guess_datetime_format(
    dt_str: str,
    dayfirst: bool = False,
    dt_str_parse: t.Callable = parse,
    dt_str_split: t.Callable = _timelex.split,
) -> t.Optional[str]:
    """
    Guess the datetime format of a given datetime string.
    Parameters
    ----------
    dt_str : str
        Datetime string to guess the format of.
    dayfirst : bool, default False
        If True parses dates with the day first, eg 20/01/2005
        Warning: dayfirst=True is not strict, but will prefer to parse
        with day first (this is a known bug).
    dt_str_parse : function, defaults to `dateutil.parser.parse`
        This function should take in a datetime string and return
        a `datetime.datetime` guess that the datetime string represents
    dt_str_split : function, defaults to `_DATEUTIL_LEXER_SPLIT` (dateutil)
        This function should take in a datetime string and return
        a list of strings, the guess of the various specific parts
        e.g. '2011/12/30' -> ['2011', '/', '12', '/', '30']
    Returns
    -------
    ret : datetime format string (for `strftime` or `strptime`)
    """
    if dt_str_parse is None or dt_str_split is None:  # pragma: no cover
        return None

    if not isinstance(dt_str, str):  # pragma: no cover
        return None

    day_attribute_and_format = (("day",), "%d", 2)

    # attr name, format, padding (if any)
    datetime_attrs_to_format = [
        (("year", "month", "day"), "%Y%m%d", 0),
        (("day", "month", "year"), "%d%m%Y", 0),
        (("year",), "%Y", 0),
        (("month",), "%B", 0),
        (("month",), "%b", 0),
        (("month",), "%m", 2),
        day_attribute_and_format,
        (("hour",), "%H", 2),
        (("minute",), "%M", 2),
        (("second",), "%S", 2),
        (("microsecond",), "%f", 6),
        (("second", "microsecond"), "%S.%f", 0),
        (("tzinfo",), "%Z", 0),
    ]

    if dayfirst:  # pragma: no cover
        datetime_attrs_to_format.remove(day_attribute_and_format)
        datetime_attrs_to_format.insert(0, day_attribute_and_format)

    try:
        parsed_datetime = dt_str_parse(dt_str, dayfirst=dayfirst)
    except (ValueError, OverflowError):
        # In case the datetime can't be parsed, its format cannot be guessed
        return None

    if parsed_datetime is None:  # pragma: no cover
        return None

    # the default dt_str_split from dateutil will never raise here; we assume
    #  that any user-provided function will not either.
    tokens = dt_str_split(dt_str)

    format_guess: t.List[t.Any] = [None] * len(tokens)
    found_attrs: t.Set[t.Any] = set()

    for attrs, attr_format, padding in datetime_attrs_to_format:
        # If a given attribute has been placed in the format string, skip
        # over other formats for that same underlying attribute (IE, month
        # can be represented in multiple different ways)
        if set(attrs) & found_attrs:
            continue

        if all(getattr(parsed_datetime, attr) is not None for attr in attrs):
            for i, token_format in enumerate(format_guess):
                token_filled = tokens[i].zfill(padding)
                if (
                    token_format is None
                    and token_filled == parsed_datetime.strftime(attr_format)
                ):
                    format_guess[i] = attr_format
                    tokens[i] = token_filled
                    found_attrs.update(attrs)
                    break

    # Only consider it a valid guess if we have a year, month and day
    if len({"year", "month", "day"} & found_attrs) != 3:  # pragma: no cover
        return None

    output_format = []
    for i, guess in enumerate(format_guess):
        if guess is not None:
            # Either fill in the format placeholder (like %Y)
            output_format.append(guess)
        else:  # pragma: no cover
            # Or just the token separate (IE, the dashes in "01-01-2013")
            try:
                # If the token is numeric, then we likely didn't parse it
                # properly, so our guess is wrong
                float(tokens[i])
                return None
            except ValueError:
                pass

            output_format.append(tokens[i])

    guessed_format = "".join(output_format)

    # rebuild string, capturing any inferred padding
    dt_str = "".join(tokens)
    if parsed_datetime.strftime(guessed_format) == dt_str:
        return guessed_format
    else:  # pragma: no cover
        return None


def infere_day_first(values: t.List[str]) -> bool:
    """Infer if the list of values have a day first."""
    values = [_timelex.split(dt_str) for dt_str in values]
    n_tokens = len(values[0])
    candidates = []

    for n in range(n_tokens):
        if values[0][n].isdigit() and len(values[0][n]) <= 2:
            candidates.append(n)

    if len(candidates) < 2:
        return False

    date_first_part_max = max(
        [int(value[candidates[0]]) for value in values if value]
    )

    if date_first_part_max > 12:  # This is not a month
        return True
    return False