"""
Base and utility classes for tseries type pandas objects.
"""
import operator
from typing import Set
import warnings
import numpy as np
from pandas._libs import NaT, iNaT, lib
from pandas._libs.algos import unique_deltas
from pandas.compat.numpy import function as nv
from pandas.errors import AbstractMethodError
from pandas.util._decorators import Appender, cache_readonly, deprecate_kwarg
from pandas.core.dtypes.common import (
ensure_int64,
is_dtype_equal,
is_float,
is_integer,
is_list_like,
is_period_dtype,
is_scalar,
)
from pandas.core.dtypes.generic import ABCIndex, ABCIndexClass, ABCSeries
from pandas.core import algorithms, ops
from pandas.core.accessor import PandasDelegate
from pandas.core.arrays import ExtensionOpsMixin
from pandas.core.arrays.datetimelike import (
DatetimeLikeArrayMixin,
_ensure_datetimelike_to_i8,
)
import pandas.core.indexes.base as ibase
from pandas.core.indexes.base import Index, _index_shared_docs
from pandas.core.tools.timedeltas import to_timedelta
import pandas.io.formats.printing as printing
from pandas.tseries.frequencies import to_offset
_index_doc_kwargs = dict(ibase._index_doc_kwargs)
def ea_passthrough(array_method):
"""
Make an alias for a method of the underlying ExtensionArray.
Parameters
----------
array_method : method on an Array class
Returns
-------
method
"""
def method(self, *args, **kwargs):
return array_method(self._data, *args, **kwargs)
method.__name__ = array_method.__name__
method.__doc__ = array_method.__doc__
return method
class DatetimeIndexOpsMixin(ExtensionOpsMixin):
"""
common ops mixin to support a unified interface datetimelike Index
"""
_data = None
# DatetimeLikeArrayMixin assumes subclasses are mutable, so these are
# properties there. They can be made into cache_readonly for Index
# subclasses bc they are immutable
inferred_freq = cache_readonly(
DatetimeLikeArrayMixin.inferred_freq.fget # type: ignore
)
_isnan = cache_readonly(DatetimeLikeArrayMixin._isnan.fget) # type: ignore
hasnans = cache_readonly(DatetimeLikeArrayMixin._hasnans.fget) # type: ignore
_hasnans = hasnans # for index / array -agnostic code
_resolution = cache_readonly(
DatetimeLikeArrayMixin._resolution.fget # type: ignore
)
resolution = cache_readonly(DatetimeLikeArrayMixin.resolution.fget) # type: ignore
_maybe_mask_results = ea_passthrough(DatetimeLikeArrayMixin._maybe_mask_results)
__iter__ = ea_passthrough(DatetimeLikeArrayMixin.__iter__)
mean = ea_passthrough(DatetimeLikeArrayMixin.mean)
@property
def freq(self):
"""
Return the frequency object if it is set, otherwise None.
"""
return self._data.freq
@freq.setter
def freq(self, value):
# validation is handled by _data setter
self._data.freq = value
@property
def freqstr(self):
"""
Return the frequency object as a string if it is set, otherwise None.
"""
return self._data.freqstr
def unique(self, level=None):
if level is not None:
self._validate_index_level(level)
result = self._data.unique()
# Note: if `self` is already unique, then self.unique() should share
# a `freq` with self. If not already unique, then self.freq must be
# None, so again sharing freq is correct.
return self._shallow_copy(result._data)
@classmethod
def _create_comparison_method(cls, op):
"""
Create a comparison method that dispatches to ``cls.values``.
"""
def wrapper(self, other):
if isinstance(other, ABCSeries):
# the arrays defer to Series for comparison ops but the indexes
# don't, so we have to unwrap here.
other = other._values
result = op(self._data, maybe_unwrap_index(other))
return result
wrapper.__doc__ = op.__doc__
wrapper.__name__ = "__{}__".format(op.__name__)
return wrapper
@property
def _ndarray_values(self):
return self._data._ndarray_values
# ------------------------------------------------------------------------
# Abstract data attributes
@property
def values(self):
# Note: PeriodArray overrides this to return an ndarray of objects.
return self._data._data
@property # type: ignore # https://github.com/python/mypy/issues/1362
@Appender(DatetimeLikeArrayMixin.asi8.__doc__)
def asi8(self):
return self._data.asi8
# ------------------------------------------------------------------------
def equals(self, other):
"""
Determines if two Index objects contain the same elements.
"""
if self.is_(other):
return True
if not isinstance(other, ABCIndexClass):
return False
elif not isinstance(other, type(self)):
try:
other = type(self)(other)
except Exception:
return False
if not is_dtype_equal(self.dtype, other.dtype):
# have different timezone
return False
elif is_period_dtype(self):
if not is_period_dtype(other):
return False
if self.freq != other.freq:
return False
return np.array_equal(self.asi8, other.asi8)
@staticmethod
def _join_i8_wrapper(joinf, dtype, with_indexers=True):
"""
Create the join wrapper methods.
"""
from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin
@staticmethod
def wrapper(left, right):
if isinstance(
left, (np.ndarray, ABCIndex, ABCSeries, DatetimeLikeArrayMixin)
):
left = left.view("i8")
if isinstance(
right, (np.ndarray, ABCIndex, ABCSeries, DatetimeLikeArrayMixin)
):
right = right.view("i8")
results = joinf(left, right)
if with_indexers:
join_index, left_indexer, right_indexer = results
join_index = join_index.view(dtype)
return join_index, left_indexer, right_indexer
return results
return wrapper
def _ensure_localized(
self, arg, ambiguous="raise", nonexistent="raise", from_utc=False
):
# See DatetimeLikeArrayMixin._ensure_localized.__doc__
if getattr(self, "tz", None):
# ensure_localized is only relevant for tz-aware DTI
result = self._data._ensure_localized(
arg, ambiguous=ambiguous, nonexistent=nonexistent, from_utc=from_utc
)
return type(self)._simple_new(result, name=self.name)
return arg
def _box_values(self, values):
return self._data._box_values(values)
@Appender(_index_shared_docs["contains"] % _index_doc_kwargs)
def __contains__(self, key):
try:
res = self.get_loc(key)
return (
is_scalar(res)
or isinstance(res, slice)
or (is_list_like(res) and len(res))
)
except (KeyError, TypeError, ValueError):
return False
# Try to run function on index first, and then on elements of index
# Especially important for group-by functionality
def map(self, mapper, na_action=None):
try:
result = mapper(self)
# Try to use this result if we can
if isinstance(result, np.ndarray):
result = Index(result)
if not isinstance(result, Index):
raise TypeError("The map function must return an Index object")
return result
except Exception:
return self.astype(object).map(mapper)
def sort_values(self, return_indexer=False, ascending=True):
"""
Return sorted copy of Index.
"""
if return_indexer:
_as = self.argsort()
if not ascending:
_as = _as[::-1]
sorted_index = self.take(_as)
return sorted_index, _as
else:
sorted_values = np.sort(self._ndarray_values)
attribs = self._get_attributes_dict()
freq = attribs["freq"]
if freq is not None and not is_period_dtype(self):
if freq.n > 0 and not ascending:
freq = freq * -1
elif freq.n < 0 and ascending:
freq = freq * -1
attribs["freq"] = freq
if not ascending:
sorted_values = sorted_values[::-1]
return self._simple_new(sorted_values, **attribs)
@Appender(_index_shared_docs["take"] % _index_doc_kwargs)
def take(self, indices, axis=0, allow_fill=True, fill_value=None, **kwargs):
nv.validate_take(tuple(), kwargs)
indices = ensure_int64(indices)
maybe_slice = lib.maybe_indices_to_slice(indices, len(self))
if isinstance(maybe_slice, slice):
return self[maybe_slice]
taken = self._assert_take_fillable(
self.asi8,
indices,
allow_fill=allow_fill,
fill_value=fill_value,
na_value=iNaT,
)
# keep freq in PeriodArray/Index, reset otherwise
freq = self.freq if is_period_dtype(self) else None
return self._shallow_copy(taken, freq=freq)
_can_hold_na = True
_na_value = NaT
"""The expected NA value to use with this index."""
@property
def asobject(self):
"""
Return object Index which contains boxed values.
.. deprecated:: 0.23.0
Use ``astype(object)`` instead.
*this is an internal non-public method*
"""
warnings.warn(
"'asobject' is deprecated. Use 'astype(object)'" " instead",
FutureWarning,
stacklevel=2,
)
return self.astype(object)
def _convert_tolerance(self, tolerance, target):
tolerance = np.asarray(to_timedelta(tolerance).to_numpy())
if target.size != tolerance.size and tolerance.size > 1:
raise ValueError("list-like tolerance size must match " "target index size")
return tolerance
def tolist(self):
"""
Return a list of the underlying data.
"""
return list(self.astype(object))
def min(self, axis=None, skipna=True, *args, **kwargs):
"""
Return the minimum value of the Index or minimum along
an axis.
See Also
--------
numpy.ndarray.min
Series.min : Return the minimum value in a Series.
"""
nv.validate_min(args, kwargs)
nv.validate_minmax_axis(axis)
if not len(self):
return self._na_value
i8 = self.asi8
try:
# quick check
if len(i8) and self.is_monotonic:
if i8[0] != iNaT:
return self._box_func(i8[0])
if self.hasnans:
if skipna:
min_stamp = self[~self._isnan].asi8.min()
else:
return self._na_value
else:
min_stamp = i8.min()
return self._box_func(min_stamp)
except ValueError:
return self._na_value
def argmin(self, axis=None, skipna=True, *args, **kwargs):
"""
Returns the indices of the minimum values along an axis.
See `numpy.ndarray.argmin` for more information on the
`axis` parameter.
See Also
--------
numpy.ndarray.argmin
"""
nv.validate_argmin(args, kwargs)
nv.validate_minmax_axis(axis)
i8 = self.asi8
if self.hasnans:
mask = self._isnan
if mask.all() or not skipna:
return -1
i8 = i8.copy()
i8[mask] = np.iinfo("int64").max
return i8.argmin()
def max(self, axis=None, skipna=True, *args, **kwargs):
"""
Return the maximum value of the Index or maximum along
an axis.
See Also
--------
numpy.ndarray.max
Series.max : Return the maximum value in a Series.
"""
nv.validate_max(args, kwargs)
nv.validate_minmax_axis(axis)
if not len(self):
return self._na_value
i8 = self.asi8
try:
# quick check
if len(i8) and self.is_monotonic:
if i8[-1] != iNaT:
return self._box_func(i8[-1])
if self.hasnans:
if skipna:
max_stamp = self[~self._isnan].asi8.max()
else:
return self._na_value
else:
max_stamp = i8.max()
return self._box_func(max_stamp)
except ValueError:
return self._na_value
def argmax(self, axis=None, skipna=True, *args, **kwargs):
"""
Returns the indices of the maximum values along an axis.
See `numpy.ndarray.argmax` for more information on the
`axis` parameter.
See Also
--------
numpy.ndarray.argmax
"""
nv.validate_argmax(args, kwargs)
nv.validate_minmax_axis(axis)
i8 = self.asi8
if self.hasnans:
mask = self._isnan
if mask.all() or not skipna:
return -1
i8 = i8.copy()
i8[mask] = 0
return i8.argmax()
# --------------------------------------------------------------------
# Rendering Methods
def _format_with_header(self, header, na_rep="NaT", **kwargs):
return header + list(self._format_native_types(na_rep, **kwargs))
@property
def _formatter_func(self):
raise AbstractMethodError(self)
def _format_attrs(self):
"""
Return a list of tuples of the (attr,formatted_value).
"""
attrs = super()._format_attrs()
for attrib in self._attributes:
if attrib == "freq":
freq = self.freqstr
if freq is not None:
freq = "'%s'" % freq
attrs.append(("freq", freq))
return attrs
# --------------------------------------------------------------------
def _convert_scalar_indexer(self, key, kind=None):
"""
We don't allow integer or float indexing on datetime-like when using
loc.
Parameters
----------
key : label of the slice bound
kind : {'ix', 'loc', 'getitem', 'iloc'} or None
"""
assert kind in ["ix", "loc", "getitem", "iloc", None]
# we don't allow integer/float indexing for loc
# we don't allow float indexing for ix/getitem
if is_scalar(key):
is_int = is_integer(key)
is_flt = is_float(key)
if kind in ["loc"] and (is_int or is_flt):
self._invalid_indexer("index", key)
elif kind in ["ix", "getitem"] and is_flt:
self._invalid_indexer("index", key)
return super()._convert_scalar_indexer(key, kind=kind)
@classmethod
def _add_datetimelike_methods(cls):
"""
Add in the datetimelike methods (as we may have to override the
superclass).
"""
def __add__(self, other):
# dispatch to ExtensionArray implementation
result = self._data.__add__(maybe_unwrap_index(other))
return wrap_arithmetic_op(self, other, result)
cls.__add__ = __add__
def __radd__(self, other):
# alias for __add__
return self.__add__(other)
cls.__radd__ = __radd__
def __sub__(self, other):
# dispatch to ExtensionArray implementation
result = self._data.__sub__(maybe_unwrap_index(other))
return wrap_arithmetic_op(self, other, result)
cls.__sub__ = __sub__
def __rsub__(self, other):
result = self._data.__rsub__(maybe_unwrap_index(other))
return wrap_arithmetic_op(self, other, result)
cls.__rsub__ = __rsub__
def isin(self, values, level=None):
"""
Compute boolean array of whether each index value is found in the
passed set of values.
Parameters
----------
values : set or sequence of values
Returns
-------
is_contained : ndarray (boolean dtype)
"""
if level is not None:
self._validate_index_level(level)
if not isinstance(values, type(self)):
try:
values = type(self)(values)
except ValueError:
return self.astype(object).isin(values)
return algorithms.isin(self.asi8, values.asi8)
def intersection(self, other, sort=False):
self._validate_sort_keyword(sort)
self._assert_can_do_setop(other)
if self.equals(other):
return self._get_reconciled_name_object(other)
if len(self) == 0:
return self.copy()
if len(other) == 0:
return other.copy()
if not isinstance(other, type(self)):
result = Index.intersection(self, other, sort=sort)
if isinstance(result, type(self)):
if result.freq is None:
result.freq = to_offset(result.inferred_freq)
return result
elif (
other.freq is None
or self.freq is None
or other.freq != self.freq
or not other.freq.isAnchored()
or (not self.is_monotonic or not other.is_monotonic)
):
result = Index.intersection(self, other, sort=sort)
# Invalidate the freq of `result`, which may not be correct at
# this point, depending on the values.
result.freq = None
if hasattr(self, "tz"):
result = self._shallow_copy(
result._values, name=result.name, tz=result.tz, freq=None
)
else:
result = self._shallow_copy(result._values, name=result.name, freq=None)
if result.freq is None:
result.freq = to_offset(result.inferred_freq)
return result
# to make our life easier, "sort" the two ranges
if self[0] <= other[0]:
left, right = self, other
else:
left, right = other, self
# after sorting, the intersection always starts with the right index
# and ends with the index of which the last elements is smallest
end = min(left[-1], right[-1])
start = right[0]
if end < start:
return type(self)(data=[])
else:
lslice = slice(*left.slice_locs(start, end))
left_chunk = left.values[lslice]
return self._shallow_copy(left_chunk)
@Appender(_index_shared_docs["repeat"] % _index_doc_kwargs)
def repeat(self, repeats, axis=None):
nv.validate_repeat(tuple(), dict(axis=axis))
freq = self.freq if is_period_dtype(self) else None
return self._shallow_copy(self.asi8.repeat(repeats), freq=freq)
@Appender(_index_shared_docs["where"] % _index_doc_kwargs)
def where(self, cond, other=None):
other = _ensure_datetimelike_to_i8(other, to_utc=True)
values = _ensure_datetimelike_to_i8(self, to_utc=True)
result = np.where(cond, values, other).astype("i8")
result = self._ensure_localized(result, from_utc=True)
return self._shallow_copy(result)
def _summary(self, name=None):
"""
Return a summarized representation.
Parameters
----------
name : str
name to use in the summary representation
Returns
-------
String with a summarized representation of the index
"""
formatter = self._formatter_func
if len(self) > 0:
index_summary = ", %s to %s" % (formatter(self[0]), formatter(self[-1]))
else:
index_summary = ""
if name is None:
name = type(self).__name__
result = "%s: %s entries%s" % (
printing.pprint_thing(name),
len(self),
index_summary,
)
if self.freq:
result += "\nFreq: %s" % self.freqstr
# display as values, not quoted
result = result.replace("'", "")
return result
def _concat_same_dtype(self, to_concat, name):
"""
Concatenate to_concat which has the same class.
"""
attribs = self._get_attributes_dict()
attribs["name"] = name
# do not pass tz to set because tzlocal cannot be hashed
if len({str(x.dtype) for x in to_concat}) != 1:
raise ValueError("to_concat must have the same tz")
new_data = type(self._values)._concat_same_type(to_concat).asi8
# GH 3232: If the concat result is evenly spaced, we can retain the
# original frequency
is_diff_evenly_spaced = len(unique_deltas(new_data)) == 1
if not is_period_dtype(self) and not is_diff_evenly_spaced:
# reset freq
attribs["freq"] = None
return self._simple_new(new_data, **attribs)
@Appender(_index_shared_docs["astype"])
def astype(self, dtype, copy=True):
if is_dtype_equal(self.dtype, dtype) and copy is False:
# Ensure that self.astype(self.dtype) is self
return self
new_values = self._data.astype(dtype, copy=copy)
# pass copy=False because any copying will be done in the
# _data.astype call above
return Index(new_values, dtype=new_values.dtype, name=self.name, copy=False)
@deprecate_kwarg(old_arg_name="n", new_arg_name="periods")
def shift(self, periods, freq=None):
"""
Shift index by desired number of time frequency increments.
This method is for shifting the values of datetime-like indexes
by a specified time increment a given number of times.
Parameters
----------
periods : int
Number of periods (or increments) to shift by,
can be positive or negative.
.. versionchanged:: 0.24.0
freq : pandas.DateOffset, pandas.Timedelta or string, optional
Frequency increment to shift by.
If None, the index is shifted by its own `freq` attribute.
Offset aliases are valid strings, e.g., 'D', 'W', 'M' etc.
Returns
-------
pandas.DatetimeIndex
Shifted index.
See Also
--------
Index.shift : Shift values of Index.
PeriodIndex.shift : Shift values of PeriodIndex.
"""
result = self._data._time_shift(periods, freq=freq)
return type(self)(result, name=self.name)
def wrap_arithmetic_op(self, other, result):
if result is NotImplemented:
return NotImplemented
if isinstance(result, tuple):
# divmod, rdivmod
assert len(result) == 2
return (
wrap_arithmetic_op(self, other, result[0]),
wrap_arithmetic_op(self, other, result[1]),
)
if not isinstance(result, Index):
# Index.__new__ will choose appropriate subclass for dtype
result = Index(result)
res_name = ops.get_op_result_name(self, other)
result.name = res_name
return result
def maybe_unwrap_index(obj):
"""
If operating against another Index object, we need to unwrap the underlying
data before deferring to the DatetimeArray/TimedeltaArray/PeriodArray
implementation, otherwise we will incorrectly return NotImplemented.
Parameters
----------
obj : object
Returns
-------
unwrapped object
"""
if isinstance(obj, ABCIndexClass):
return obj._data
return obj
class DatetimelikeDelegateMixin(PandasDelegate):
"""
Delegation mechanism, specific for Datetime, Timedelta, and Period types.
Functionality is delegated from the Index class to an Array class. A
few things can be customized
* _delegate_class : type
The class being delegated to.
* _delegated_methods, delegated_properties : List
The list of property / method names being delagated.
* raw_methods : Set
The set of methods whose results should should *not* be
boxed in an index, after being returned from the array
* raw_properties : Set
The set of properties whose results should should *not* be
boxed in an index, after being returned from the array
"""
# raw_methods : dispatch methods that shouldn't be boxed in an Index
_raw_methods = set() # type: Set[str]
# raw_properties : dispatch properties that shouldn't be boxed in an Index
_raw_properties = set() # type: Set[str]
name = None
_data = None
@property
def _delegate_class(self):
raise AbstractMethodError
def _delegate_property_get(self, name, *args, **kwargs):
result = getattr(self._data, name)
if name not in self._raw_properties:
result = Index(result, name=self.name)
return result
def _delegate_property_set(self, name, value, *args, **kwargs):
setattr(self._data, name, value)
def _delegate_method(self, name, *args, **kwargs):
result = operator.methodcaller(name, *args, **kwargs)(self._data)
if name not in self._raw_methods:
result = Index(result, name=self.name)
return result