Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import datetime
import re
import typing as t
from dateutil.parser import parse
import numpy as np
import pyarrow as pa
import sarus_data_spec.type as sdt
import sarus_data_spec.typing as st
class DateInferenceException(Exception):
pass
def infer_dates(
_type: st.Type,
arrow_array: pa.array,
) -> st.Type:
"""Visitor for type inference, specifically it checks that:
- text types are actual text: date and datetimes
inference is performed
"""
class TypeConverter(st.TypeVisitor):
new_type: st.Type = _type
def Text(
self,
encoding: str,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
max_length = pa.compute.max(
pa.compute.utf8_length(arrow_array)
).as_py()
if max_length < 30: # max length for a datetime formatted as text
try:
# TODO: infer_date_format can't find time only format
# we need to modify it to be able to
# guess as well time format
date_format = infer_date_format(
arrow_array.to_numpy(zero_copy_only=False),
)
if any(
[
time_format in date_format
for time_format in ["M", "H", "S", "f"]
]
):
self.new_type = sdt.Datetime(
format=date_format,
possible_values=possible_values,
properties=properties,
)
else:
self.new_type = sdt.Date(
format=date_format,
possible_values=possible_values,
properties=properties,
)
except Exception:
pass
def Float(
self,
min: float,
max: float,
base: st.FloatBase,
possible_values: t.Iterable[float],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Struct(
self,
fields: t.Mapping[str, st.Type],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
new_fields = {}
old_arrs = arrow_array.flatten()
for field_name, field_type in fields.items():
idx = arrow_array.type.get_field_index(field_name)
new_fields[field_name] = infer_dates(field_type, old_arrs[idx])
self.new_type = sdt.Struct(
fields=new_fields,
name=name if name is not None else "",
properties=properties,
)
def Union(
self,
fields: t.Mapping[str, st.Type],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
new_fields = {}
old_arrs = arrow_array.flatten()
for field_name, field_type in fields.items():
idx = arrow_array.type.get_field_index(field_name)
new_fields[field_name] = infer_dates(
field_type, old_arrs[idx].filter(old_arrs[idx].is_valid())
)
self.new_type = sdt.Union(
fields=new_fields,
name=name if name is not None else "",
properties=properties,
)
def Optional(
self,
type: st.Type,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
self.new_type = sdt.Optional(
type=infer_dates(
type,
arrow_array.filter(
pa.compute.invert(
arrow_array.is_null(nan_is_null=True)
)
),
),
name=name if name is not None else "",
properties=properties,
)
def Datetime(
self,
format: str,
min: str,
max: str,
base: st.DatetimeBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Date(
self,
format: str,
min: str,
max: str,
base: st.DateBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Time(
self,
format: str,
min: str,
max: str,
base: st.TimeBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Duration(
self,
unit: str,
min: int,
max: int,
possible_values: t.Iterable[int],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Array(
self,
type: st.Type,
shape: t.Tuple[int, ...],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Boolean(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
pass
def Unit(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
pass
def Bytes(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
pass
def Constrained(
self,
type: st.Type,
constraint: st.Predicate,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Null(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
pass
def Enum(
self,
name: str,
name_values: t.Sequence[t.Tuple[str, int]],
ordered: bool,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Hypothesis(
self,
*types: t.Tuple[st.Type, float],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Id(
self,
unique: bool,
reference: t.Optional[st.Path] = None,
base: t.Optional[st.IdBase] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def Integer(
self,
min: int,
max: int,
base: st.IntegerBase,
possible_values: t.Iterable[int],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
def List(
self,
type: st.Type,
max_size: int,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
pass
visitor = TypeConverter()
_type.accept(visitor)
return visitor.new_type
def infer_date_format(values: t.List[str]) -> str:
"""Infer if the list of values are dates.
Parameters
----------
values: List[str]
A list of values from which to infer the type.
Returns
-------
A dict representing a date type if a format is found, None otherwise.
Raises
------
Exception
If no possible format has been found. That is the behavior
expected by the infer_dates visitor for Text columns.
"""
day_first = infere_day_first(values)
formats = []
for value in values:
guess_format = guess_datetime_format(value, day_first)
if guess_format is None:
raise DateInferenceException
formats.append(guess_format)
unique_formats: t.List[str] = np.unique(formats).tolist()
for format in unique_formats:
try:
out = list(
map(
lambda x: datetime.datetime.strptime(x, format).strftime(
format
),
values,
)
)
except ValueError:
pass
else:
if np.array_equal(out, values):
return format
raise DateInferenceException
# Distributed under the Apache 2 licence
# https://github.com/dateutil/dateutil/blob/master/LICENSE
#
# The following class is copied from
# https://github.com/dateutil/dateutil/pull/732
#
# We use this class to parse and tokenize date strings. However, as it is
# a private class in the dateutil library, relying on backwards compatibility
# is not practical. In fact, using this class issues warnings (xref gh-21322).
# Thus, we port the class over so that both issues are resolved.
#
# Copyright (c) 2017 - dateutil contributors
class _timelex:
def __init__(self, instream: t.Any) -> None:
if getattr(instream, "decode", None) is not None: # pragma: no cover
instream = instream.decode()
if isinstance(instream, str):
self.stream = instream
elif getattr(instream, "read", None) is None: # pragma: no cover
raise TypeError(
"Parser must be a string or character stream, not "
f"{type(instream).__name__}"
)
else: # pragma: no cover
self.stream = instream.read()
def get_tokens(self) -> t.Any:
"""
This function breaks the time string into lexical units (tokens), which
can be parsed by the parser. Lexical units are demarcated by changes in
the character set, so any continuous string of letters is considered
one unit, any continuous string of numbers is considered one unit.
The main complication arises from the fact that dots ('.') can be used
both as separators (e.g. "Sep.20.2009") or decimal points (e.g.
"4:30:21.447"). As such, it is necessary to read the full context of
any dot-separated strings before breaking it into tokens; as such, this
function maintains a "token stack", for when the ambiguous context
demands that multiple tokens be parsed at once.
"""
# cdef:
# Py_ssize_t n
stream = self.stream.replace("\x00", "")
# TODO: Change \s --> \s+ (this doesn't match existing behavior)
# TODO: change the punctuation block to punc+ (does not match existing)
# TODO: can we merge the two digit patterns?
tokens = re.findall(
r"\s|"
r"(?<![\.\d])\d+\.\d+(?![\.\d])"
r"|\d+"
r"|[a-zA-Z]+"
r"|[\./:]+"
r"|[^\da-zA-Z\./:\s]+",
stream,
)
# Re-combine token tuples of the form ["59", ",", "456"] because
# in this context the "," is treated as a decimal
# (e.g. in python's default logging format)
for n, token in enumerate(tokens[:-2]):
# Kludge to match ,-decimal behavior; it'd be better to do this
# later in the process and have a simpler tokenization
if (
token is not None
and token.isdigit()
and tokens[n + 1] == ","
and tokens[n + 2].isdigit()
): # pragma: no cover
# Have to check None b/c it might be replaced during the loop
# TODO: I _really_ don't faking the value here
tokens[n] = token + "." + tokens[n + 2]
tokens[n + 1] = None
tokens[n + 2] = None
tokens = [x for x in tokens if x is not None]
return tokens
@classmethod
def split(cls, s: t.Any) -> t.Any:
return cls(s).get_tokens()
# There is the pandas implementation here that we could adapt if
# needed, especially, we could have problems with the locale
# https://github.com/pandas-dev/pandas/blob/097ff0c34179ab55d08a6ab0757762beea4d8785/pandas/_libs/tslibs/parsing.pyx#L819,
# pandas limitation is that it is based on the first non-NaN element.
def guess_datetime_format(
dt_str: str,
dayfirst: bool = False,
dt_str_parse: t.Callable = parse,
dt_str_split: t.Callable = _timelex.split,
) -> t.Optional[str]:
"""
Guess the datetime format of a given datetime string.
Parameters
----------
dt_str : str
Datetime string to guess the format of.
dayfirst : bool, default False
If True parses dates with the day first, eg 20/01/2005
Warning: dayfirst=True is not strict, but will prefer to parse
with day first (this is a known bug).
dt_str_parse : function, defaults to `dateutil.parser.parse`
This function should take in a datetime string and return
a `datetime.datetime` guess that the datetime string represents
dt_str_split : function, defaults to `_DATEUTIL_LEXER_SPLIT` (dateutil)
This function should take in a datetime string and return
a list of strings, the guess of the various specific parts
e.g. '2011/12/30' -> ['2011', '/', '12', '/', '30']
Returns
-------
ret : datetime format string (for `strftime` or `strptime`)
"""
if dt_str_parse is None or dt_str_split is None: # pragma: no cover
return None
if not isinstance(dt_str, str): # pragma: no cover
return None
day_attribute_and_format = (("day",), "%d", 2)
# attr name, format, padding (if any)
datetime_attrs_to_format = [
(("year", "month", "day"), "%Y%m%d", 0),
(("day", "month", "year"), "%d%m%Y", 0),
(("year",), "%Y", 0),
(("month",), "%B", 0),
(("month",), "%b", 0),
(("month",), "%m", 2),
day_attribute_and_format,
(("hour",), "%H", 2),
(("minute",), "%M", 2),
(("second",), "%S", 2),
(("microsecond",), "%f", 6),
(("second", "microsecond"), "%S.%f", 0),
(("tzinfo",), "%Z", 0),
]
if dayfirst: # pragma: no cover
datetime_attrs_to_format.remove(day_attribute_and_format)
datetime_attrs_to_format.insert(0, day_attribute_and_format)
try:
parsed_datetime = dt_str_parse(dt_str, dayfirst=dayfirst)
except (ValueError, OverflowError):
# In case the datetime can't be parsed, its format cannot be guessed
return None
if parsed_datetime is None: # pragma: no cover
return None
# the default dt_str_split from dateutil will never raise here; we assume
# that any user-provided function will not either.
tokens = dt_str_split(dt_str)
format_guess: t.List[t.Any] = [None] * len(tokens)
found_attrs: t.Set[t.Any] = set()
for attrs, attr_format, padding in datetime_attrs_to_format:
# If a given attribute has been placed in the format string, skip
# over other formats for that same underlying attribute (IE, month
# can be represented in multiple different ways)
if set(attrs) & found_attrs:
continue
if all(getattr(parsed_datetime, attr) is not None for attr in attrs):
for i, token_format in enumerate(format_guess):
token_filled = tokens[i].zfill(padding)
if (
token_format is None
and token_filled == parsed_datetime.strftime(attr_format)
):
format_guess[i] = attr_format
tokens[i] = token_filled
found_attrs.update(attrs)
break
# Only consider it a valid guess if we have a year, month and day
if len({"year", "month", "day"} & found_attrs) != 3: # pragma: no cover
return None
output_format = []
for i, guess in enumerate(format_guess):
if guess is not None:
# Either fill in the format placeholder (like %Y)
output_format.append(guess)
else: # pragma: no cover
# Or just the token separate (IE, the dashes in "01-01-2013")
try:
# If the token is numeric, then we likely didn't parse it
# properly, so our guess is wrong
float(tokens[i])
return None
except ValueError:
pass
output_format.append(tokens[i])
guessed_format = "".join(output_format)
# rebuild string, capturing any inferred padding
dt_str = "".join(tokens)
if parsed_datetime.strftime(guessed_format) == dt_str:
return guessed_format
else: # pragma: no cover
return None
def infere_day_first(values: t.List[str]) -> bool:
"""Infer if the list of values have a day first."""
values = [_timelex.split(dt_str) for dt_str in values]
n_tokens = len(values[0])
candidates = []
for n in range(n_tokens):
if values[0][n].isdigit() and len(values[0][n]) <= 2:
candidates.append(n)
if len(candidates) < 2:
return False
date_first_part_max = max(
[int(value[candidates[0]]) for value in values if value]
)
if date_first_part_max > 12: # This is not a month
return True
return False