""" define the IntervalIndex """
from operator import le, lt
import textwrap
from typing import Any, Optional, Tuple, Union
import warnings
import numpy as np
from pandas._config import get_option
from pandas._libs import Timedelta, Timestamp, lib
from pandas._libs.interval import Interval, IntervalMixin, IntervalTree
from pandas.util._decorators import Appender, Substitution, cache_readonly
from pandas.util._exceptions import rewrite_exception
from pandas.core.dtypes.cast import (
find_common_type,
infer_dtype_from_scalar,
maybe_downcast_to_dtype,
)
from pandas.core.dtypes.common import (
ensure_platform_int,
is_datetime64tz_dtype,
is_datetime_or_timedelta_dtype,
is_dtype_equal,
is_float,
is_float_dtype,
is_integer,
is_integer_dtype,
is_interval_dtype,
is_list_like,
is_number,
is_object_dtype,
is_scalar,
)
from pandas.core.dtypes.generic import ABCSeries
from pandas.core.dtypes.missing import isna
from pandas._typing import AnyArrayLike
from pandas.core.arrays.interval import IntervalArray, _interval_shared_docs
import pandas.core.common as com
import pandas.core.indexes.base as ibase
from pandas.core.indexes.base import (
Index,
InvalidIndexError,
_index_shared_docs,
default_pprint,
ensure_index,
)
from pandas.core.indexes.datetimes import DatetimeIndex, date_range
from pandas.core.indexes.multi import MultiIndex
from pandas.core.indexes.timedeltas import TimedeltaIndex, timedelta_range
from pandas.core.ops import get_op_result_name
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import DateOffset
_VALID_CLOSED = {"left", "right", "both", "neither"}
_index_doc_kwargs = dict(ibase._index_doc_kwargs)
_index_doc_kwargs.update(
dict(
klass="IntervalIndex",
qualname="IntervalIndex",
target_klass="IntervalIndex or list of Intervals",
name=textwrap.dedent(
"""\
name : object, optional
Name to be stored in the index.
"""
),
)
)
def _get_next_label(label):
dtype = getattr(label, "dtype", type(label))
if isinstance(label, (Timestamp, Timedelta)):
dtype = "datetime64"
if is_datetime_or_timedelta_dtype(dtype) or is_datetime64tz_dtype(dtype):
return label + np.timedelta64(1, "ns")
elif is_integer_dtype(dtype):
return label + 1
elif is_float_dtype(dtype):
return np.nextafter(label, np.infty)
else:
raise TypeError(
"cannot determine next label for type {typ!r}".format(typ=type(label))
)
def _get_prev_label(label):
dtype = getattr(label, "dtype", type(label))
if isinstance(label, (Timestamp, Timedelta)):
dtype = "datetime64"
if is_datetime_or_timedelta_dtype(dtype) or is_datetime64tz_dtype(dtype):
return label - np.timedelta64(1, "ns")
elif is_integer_dtype(dtype):
return label - 1
elif is_float_dtype(dtype):
return np.nextafter(label, -np.infty)
else:
raise TypeError(
"cannot determine next label for type {typ!r}".format(typ=type(label))
)
def _get_interval_closed_bounds(interval):
"""
Given an Interval or IntervalIndex, return the corresponding interval with
closed bounds.
"""
left, right = interval.left, interval.right
if interval.open_left:
left = _get_next_label(left)
if interval.open_right:
right = _get_prev_label(right)
return left, right
def _new_IntervalIndex(cls, d):
"""
This is called upon unpickling, rather than the default which doesn't have
arguments and breaks __new__
"""
return cls.from_arrays(**d)
class SetopCheck:
"""
This is called to decorate the set operations of IntervalIndex
to perform the type check in advance.
"""
def __init__(self, op_name):
self.op_name = op_name
def __call__(self, setop):
def func(intvidx_self, other, sort=False):
intvidx_self._assert_can_do_setop(other)
other = ensure_index(other)
if not isinstance(other, IntervalIndex):
result = getattr(intvidx_self.astype(object), self.op_name)(other)
if self.op_name in ("difference",):
result = result.astype(intvidx_self.dtype)
return result
elif intvidx_self.closed != other.closed:
msg = (
"can only do set operations between two IntervalIndex "
"objects that are closed on the same side"
)
raise ValueError(msg)
# GH 19016: ensure set op will not return a prohibited dtype
subtypes = [intvidx_self.dtype.subtype, other.dtype.subtype]
common_subtype = find_common_type(subtypes)
if is_object_dtype(common_subtype):
msg = (
"can only do {op} between two IntervalIndex "
"objects that have compatible dtypes"
)
raise TypeError(msg.format(op=self.op_name))
return setop(intvidx_self, other, sort)
return func
@Appender(
_interval_shared_docs["class"]
% dict(
klass="IntervalIndex",
summary="Immutable index of intervals that are closed on the same side.",
name=_index_doc_kwargs["name"],
versionadded="0.20.0",
extra_attributes="is_overlapping\nvalues\n",
extra_methods="",
examples=textwrap.dedent(
"""\
Examples
--------
A new ``IntervalIndex`` is typically constructed using
:func:`interval_range`:
>>> pd.interval_range(start=0, end=5)
IntervalIndex([(0, 1], (1, 2], (2, 3], (3, 4], (4, 5]],
closed='right',
dtype='interval[int64]')
It may also be constructed using one of the constructor
methods: :meth:`IntervalIndex.from_arrays`,
:meth:`IntervalIndex.from_breaks`, and :meth:`IntervalIndex.from_tuples`.
See further examples in the doc strings of ``interval_range`` and the
mentioned constructor methods.
"""
),
)
)
class IntervalIndex(IntervalMixin, Index):
_typ = "intervalindex"
_comparables = ["name"]
_attributes = ["name", "closed"]
# we would like our indexing holder to defer to us
_defer_to_indexing = True
# Immutable, so we are able to cache computations like isna in '_mask'
_mask = None
# --------------------------------------------------------------------
# Constructors
def __new__(
cls, data, closed=None, dtype=None, copy=False, name=None, verify_integrity=True
):
if name is None and hasattr(data, "name"):
name = data.name
with rewrite_exception("IntervalArray", cls.__name__):
array = IntervalArray(
data,
closed=closed,
copy=copy,
dtype=dtype,
verify_integrity=verify_integrity,
)
return cls._simple_new(array, name)
@classmethod
def _simple_new(cls, array, name, closed=None):
"""
Construct from an IntervalArray
Parameters
----------
array : IntervalArray
name : str
Attached as result.name
closed : Any
Ignored.
"""
result = IntervalMixin.__new__(cls)
result._data = array
result.name = name
result._reset_identity()
return result
@classmethod
@Appender(_interval_shared_docs["from_breaks"] % _index_doc_kwargs)
def from_breaks(cls, breaks, closed="right", name=None, copy=False, dtype=None):
with rewrite_exception("IntervalArray", cls.__name__):
array = IntervalArray.from_breaks(
breaks, closed=closed, copy=copy, dtype=dtype
)
return cls._simple_new(array, name=name)
@classmethod
@Appender(_interval_shared_docs["from_arrays"] % _index_doc_kwargs)
def from_arrays(
cls, left, right, closed="right", name=None, copy=False, dtype=None
):
with rewrite_exception("IntervalArray", cls.__name__):
array = IntervalArray.from_arrays(
left, right, closed, copy=copy, dtype=dtype
)
return cls._simple_new(array, name=name)
@classmethod
@Appender(_interval_shared_docs["from_intervals"] % _index_doc_kwargs)
def from_intervals(cls, data, closed=None, name=None, copy=False, dtype=None):
msg = (
"IntervalIndex.from_intervals is deprecated and will be "
"removed in a future version; Use IntervalIndex(...) instead"
)
warnings.warn(msg, FutureWarning, stacklevel=2)
with rewrite_exception("IntervalArray", cls.__name__):
array = IntervalArray(data, closed=closed, copy=copy, dtype=dtype)
if name is None and isinstance(data, cls):
name = data.name
return cls._simple_new(array, name=name)
@classmethod
@Appender(_interval_shared_docs["from_tuples"] % _index_doc_kwargs)
def from_tuples(cls, data, closed="right", name=None, copy=False, dtype=None):
with rewrite_exception("IntervalArray", cls.__name__):
arr = IntervalArray.from_tuples(data, closed=closed, copy=copy, dtype=dtype)
return cls._simple_new(arr, name=name)
# --------------------------------------------------------------------
@Appender(_index_shared_docs["_shallow_copy"])
def _shallow_copy(self, left=None, right=None, **kwargs):
result = self._data._shallow_copy(left=left, right=right)
attributes = self._get_attributes_dict()
attributes.update(kwargs)
return self._simple_new(result, **attributes)
@cache_readonly
def _isnan(self):
"""Return a mask indicating if each value is NA"""
if self._mask is None:
self._mask = isna(self.left)
return self._mask
@cache_readonly
def _engine(self):
left = self._maybe_convert_i8(self.left)
right = self._maybe_convert_i8(self.right)
return IntervalTree(left, right, closed=self.closed)
def __contains__(self, key):
"""
return a boolean if this key is IN the index
We *only* accept an Interval
Parameters
----------
key : Interval
Returns
-------
boolean
"""
if not isinstance(key, Interval):
return False
try:
self.get_loc(key)
return True
except KeyError:
return False
@Appender(
_interval_shared_docs["to_tuples"]
% dict(
return_type="Index",
examples="""
Examples
--------
>>> idx = pd.IntervalIndex.from_arrays([0, np.nan, 2], [1, np.nan, 3])
>>> idx.to_tuples()
Index([(0.0, 1.0), (nan, nan), (2.0, 3.0)], dtype='object')
>>> idx.to_tuples(na_tuple=False)
Index([(0.0, 1.0), nan, (2.0, 3.0)], dtype='object')""",
)
)
def to_tuples(self, na_tuple=True):
tuples = self._data.to_tuples(na_tuple=na_tuple)
return Index(tuples)
@cache_readonly
def _multiindex(self):
return MultiIndex.from_arrays([self.left, self.right], names=["left", "right"])
@property
def left(self):
"""
Return the left endpoints of each Interval in the IntervalIndex as
an Index
"""
return self._data._left
@property
def right(self):
"""
Return the right endpoints of each Interval in the IntervalIndex as
an Index
"""
return self._data._right
@property
def closed(self):
"""
Whether the intervals are closed on the left-side, right-side, both or
neither
"""
return self._data._closed
@Appender(_interval_shared_docs["set_closed"] % _index_doc_kwargs)
def set_closed(self, closed):
if closed not in _VALID_CLOSED:
msg = "invalid option for 'closed': {closed}"
raise ValueError(msg.format(closed=closed))
# return self._shallow_copy(closed=closed)
array = self._data.set_closed(closed)
return self._simple_new(array, self.name)
@property
def length(self):
"""
Return an Index with entries denoting the length of each Interval in
the IntervalIndex
"""
return self._data.length
@property
def size(self):
# Avoid materializing ndarray[Interval]
return self._data.size
@property
def itemsize(self):
msg = (
"IntervalIndex.itemsize is deprecated and will be removed in "
"a future version"
)
warnings.warn(msg, FutureWarning, stacklevel=2)
# suppress the warning from the underlying left/right itemsize
with warnings.catch_warnings():
warnings.simplefilter("ignore")
return self.left.itemsize + self.right.itemsize
def __len__(self):
return len(self.left)
@cache_readonly
def values(self):
"""
Return the IntervalIndex's data as an IntervalArray.
"""
return self._data
@cache_readonly
def _values(self):
return self._data
@cache_readonly
def _ndarray_values(self):
return np.array(self._data)
def __array__(self, result=None):
""" the array interface, return my values """
return self._ndarray_values
def __array_wrap__(self, result, context=None):
# we don't want the superclass implementation
return result
def __reduce__(self):
d = dict(left=self.left, right=self.right)
d.update(self._get_attributes_dict())
return _new_IntervalIndex, (self.__class__, d), None
@Appender(_index_shared_docs["copy"])
def copy(self, deep=False, name=None):
array = self._data
if deep:
array = array.copy()
attributes = self._get_attributes_dict()
if name is not None:
attributes.update(name=name)
return self._simple_new(array, **attributes)
@Appender(_index_shared_docs["astype"])
def astype(self, dtype, copy=True):
with rewrite_exception("IntervalArray", self.__class__.__name__):
new_values = self.values.astype(dtype, copy=copy)
if is_interval_dtype(new_values):
return self._shallow_copy(new_values.left, new_values.right)
return super().astype(dtype, copy=copy)
@cache_readonly
def dtype(self):
"""Return the dtype object of the underlying data"""
return self._data.dtype
@property
def inferred_type(self):
"""Return a string of the type inferred from the values"""
return "interval"
@Appender(Index.memory_usage.__doc__)
def memory_usage(self, deep=False):
# we don't use an explicit engine
# so return the bytes here
return self.left.memory_usage(deep=deep) + self.right.memory_usage(deep=deep)
@cache_readonly
def mid(self):
"""
Return the midpoint of each Interval in the IntervalIndex as an Index
"""
return self._data.mid
@cache_readonly
def is_monotonic(self):
"""
Return True if the IntervalIndex is monotonic increasing (only equal or
increasing values), else False
"""
return self.is_monotonic_increasing
@cache_readonly
def is_monotonic_increasing(self):
"""
Return True if the IntervalIndex is monotonic increasing (only equal or
increasing values), else False
"""
return self._engine.is_monotonic_increasing
@cache_readonly
def is_monotonic_decreasing(self):
"""
Return True if the IntervalIndex is monotonic decreasing (only equal or
decreasing values), else False
"""
return self[::-1].is_monotonic_increasing
@cache_readonly
def is_unique(self):
"""
Return True if the IntervalIndex contains unique elements, else False
"""
left = self.left
right = self.right
if self.isna().sum() > 1:
return False
if left.is_unique or right.is_unique:
return True
seen_pairs = set()
check_idx = np.where(left.duplicated(keep=False))[0]
for idx in check_idx:
pair = (left[idx], right[idx])
if pair in seen_pairs:
return False
seen_pairs.add(pair)
return True
@cache_readonly
@Appender(_interval_shared_docs["is_non_overlapping_monotonic"] % _index_doc_kwargs)
def is_non_overlapping_monotonic(self):
return self._data.is_non_overlapping_monotonic
@property
def is_overlapping(self):
"""
Return True if the IntervalIndex has overlapping intervals, else False.
Two intervals overlap if they share a common point, including closed
endpoints. Intervals that only have an open endpoint in common do not
overlap.
.. versionadded:: 0.24.0
Returns
-------
bool
Boolean indicating if the IntervalIndex has overlapping intervals.
See Also
--------
Interval.overlaps : Check whether two Interval objects overlap.
IntervalIndex.overlaps : Check an IntervalIndex elementwise for
overlaps.
Examples
--------
>>> index = pd.IntervalIndex.from_tuples([(0, 2), (1, 3), (4, 5)])
>>> index
IntervalIndex([(0, 2], (1, 3], (4, 5]],
closed='right',
dtype='interval[int64]')
>>> index.is_overlapping
True
Intervals that share closed endpoints overlap:
>>> index = pd.interval_range(0, 3, closed='both')
>>> index
IntervalIndex([[0, 1], [1, 2], [2, 3]],
closed='both',
dtype='interval[int64]')
>>> index.is_overlapping
True
Intervals that only have an open endpoint in common do not overlap:
>>> index = pd.interval_range(0, 3, closed='left')
>>> index
IntervalIndex([[0, 1), [1, 2), [2, 3)],
closed='left',
dtype='interval[int64]')
>>> index.is_overlapping
False
"""
# GH 23309
return self._engine.is_overlapping
@Appender(_index_shared_docs["_convert_scalar_indexer"])
def _convert_scalar_indexer(self, key, kind=None):
if kind == "iloc":
return super()._convert_scalar_indexer(key, kind=kind)
return key
def _maybe_cast_slice_bound(self, label, side, kind):
return getattr(self, side)._maybe_cast_slice_bound(label, side, kind)
@Appender(_index_shared_docs["_convert_list_indexer"])
def _convert_list_indexer(self, keyarr, kind=None):
"""
we are passed a list-like indexer. Return the
indexer for matching intervals.
"""
locs = self.get_indexer_for(keyarr)
# we have missing values
if (locs == -1).any():
raise KeyError
return locs
def _maybe_cast_indexed(self, key):
"""
we need to cast the key, which could be a scalar
or an array-like to the type of our subtype
"""
if isinstance(key, IntervalIndex):
return key
subtype = self.dtype.subtype
if is_float_dtype(subtype):
if is_integer(key):
key = float(key)
elif isinstance(key, (np.ndarray, Index)):
key = key.astype("float64")
elif is_integer_dtype(subtype):
if is_integer(key):
key = int(key)
return key
def _can_reindex(self, indexer: np.ndarray) -> None:
"""
Check if we are allowing reindexing with this particular indexer.
Parameters
----------
indexer : an integer indexer
Raises
------
ValueError if its a duplicate axis
"""
# trying to reindex on an axis with duplicates
if self.is_overlapping and len(indexer):
raise ValueError("cannot reindex from an overlapping axis")
def _needs_i8_conversion(self, key):
"""
Check if a given key needs i8 conversion. Conversion is necessary for
Timestamp, Timedelta, DatetimeIndex, and TimedeltaIndex keys. An
Interval-like requires conversion if it's endpoints are one of the
aforementioned types.
Assumes that any list-like data has already been cast to an Index.
Parameters
----------
key : scalar or Index-like
The key that should be checked for i8 conversion
Returns
-------
boolean
"""
if is_interval_dtype(key) or isinstance(key, Interval):
return self._needs_i8_conversion(key.left)
i8_types = (Timestamp, Timedelta, DatetimeIndex, TimedeltaIndex)
return isinstance(key, i8_types)
def _maybe_convert_i8(self, key):
"""
Maybe convert a given key to it's equivalent i8 value(s). Used as a
preprocessing step prior to IntervalTree queries (self._engine), which
expects numeric data.
Parameters
----------
key : scalar or list-like
The key that should maybe be converted to i8.
Returns
-------
key: scalar or list-like
The original key if no conversion occured, int if converted scalar,
Int64Index if converted list-like.
"""
original = key
if is_list_like(key):
key = ensure_index(key)
if not self._needs_i8_conversion(key):
return original
scalar = is_scalar(key)
if is_interval_dtype(key) or isinstance(key, Interval):
# convert left/right and reconstruct
left = self._maybe_convert_i8(key.left)
right = self._maybe_convert_i8(key.right)
constructor = Interval if scalar else IntervalIndex.from_arrays
return constructor(left, right, closed=self.closed)
if scalar:
# Timestamp/Timedelta
key_dtype, key_i8 = infer_dtype_from_scalar(key, pandas_dtype=True)
else:
# DatetimeIndex/TimedeltaIndex
key_dtype, key_i8 = key.dtype, Index(key.asi8)
if key.hasnans:
# convert NaT from it's i8 value to np.nan so it's not viewed
# as a valid value, maybe causing errors (e.g. is_overlapping)
key_i8 = key_i8.where(~key._isnan)
# ensure consistency with IntervalIndex subtype
subtype = self.dtype.subtype
msg = (
"Cannot index an IntervalIndex of subtype {subtype} with "
"values of dtype {other}"
)
if not is_dtype_equal(subtype, key_dtype):
raise ValueError(msg.format(subtype=subtype, other=key_dtype))
return key_i8
def _check_method(self, method):
if method is None:
return
if method in ["bfill", "backfill", "pad", "ffill", "nearest"]:
msg = "method {method} not yet implemented for IntervalIndex"
raise NotImplementedError(msg.format(method=method))
raise ValueError("Invalid fill method")
def _searchsorted_monotonic(self, label, side, exclude_label=False):
if not self.is_non_overlapping_monotonic:
raise KeyError(
"can only get slices from an IntervalIndex if "
"bounds are non-overlapping and all monotonic "
"increasing or decreasing"
)
if isinstance(label, IntervalMixin):
msg = "Interval objects are not currently supported"
raise NotImplementedError(msg)
# GH 20921: "not is_monotonic_increasing" for the second condition
# instead of "is_monotonic_decreasing" to account for single element
# indexes being both increasing and decreasing
if (side == "left" and self.left.is_monotonic_increasing) or (
side == "right" and not self.left.is_monotonic_increasing
):
sub_idx = self.right
if self.open_right or exclude_label:
label = _get_next_label(label)
else:
sub_idx = self.left
if self.open_left or exclude_label:
label = _get_prev_label(label)
return sub_idx._searchsorted_monotonic(label, side)
def _find_non_overlapping_monotonic_bounds(self, key):
if isinstance(key, IntervalMixin):
start = self._searchsorted_monotonic(
key.left, "left", exclude_label=key.open_left
)
stop = self._searchsorted_monotonic(
key.right, "right", exclude_label=key.open_right
)
elif isinstance(key, slice):
# slice
start, stop = key.start, key.stop
if (key.step or 1) != 1:
raise NotImplementedError("cannot slice with a slice step")
if start is None:
start = 0
else:
start = self._searchsorted_monotonic(start, "left")
if stop is None:
stop = len(self)
else:
stop = self._searchsorted_monotonic(stop, "right")
else:
# scalar or index-like
start = self._searchsorted_monotonic(key, "left")
stop = self._searchsorted_monotonic(key, "right")
return start, stop
def get_loc(
self, key: Any, method: Optional[str] = None
) -> Union[int, slice, np.ndarray]:
"""
Get integer location, slice or boolean mask for requested label.
Parameters
----------
key : label
method : {None}, optional
* default: matches where the label is within an interval only.
Returns
-------
loc : int if unique index, slice if monotonic index, else mask
Examples
--------
>>> i1, i2 = pd.Interval(0, 1), pd.Interval(1, 2)
>>> index = pd.IntervalIndex([i1, i2])
>>> index.get_loc(1)
0
You can also supply a point inside an interval.
>>> index.get_loc(1.5)
1
If a label is in several intervals, you get the locations of all the
relevant intervals.
>>> i3 = pd.Interval(0, 2)
>>> overlapping_index = pd.IntervalIndex([i1, i2, i3])
>>> overlapping_index.get_loc(0.5)
array([ True, False, True])
Only exact matches will be returned if an interval is provided.
>>> index.get_loc(pd.Interval(0, 1))
0
"""
self._check_method(method)
# list-like are invalid labels for II but in some cases may work, e.g
# single element array of comparable type, so guard against them early
if is_list_like(key):
raise KeyError(key)
if isinstance(key, Interval):
if self.closed != key.closed:
raise KeyError(key)
mask = (self.left == key.left) & (self.right == key.right)
else:
# assume scalar
op_left = le if self.closed_left else lt
op_right = le if self.closed_right else lt
try:
mask = op_left(self.left, key) & op_right(key, self.right)
except TypeError:
# scalar is not comparable to II subtype --> invalid label
raise KeyError(key)
matches = mask.sum()
if matches == 0:
raise KeyError(key)
elif matches == 1:
return mask.argmax()
return lib.maybe_booleans_to_slice(mask.view("u1"))
@Substitution(
**dict(
_index_doc_kwargs,
**{
"raises_section": textwrap.dedent(
"""
Raises
------
NotImplementedError
If any method argument other than the default of
None is specified as these are not yet implemented.
"""
)
}
)
)
@Appender(_index_shared_docs["get_indexer"])
def get_indexer(
self,
target: AnyArrayLike,
method: Optional[str] = None,
limit: Optional[int] = None,
tolerance: Optional[Any] = None,
) -> np.ndarray:
self._check_method(method)
if self.is_overlapping:
msg = (
"cannot handle overlapping indices; use "
"IntervalIndex.get_indexer_non_unique"
)
raise InvalidIndexError(msg)
target = ensure_index(target)
if isinstance(target, IntervalIndex):
# equal indexes -> 1:1 positional match
if self.equals(target):
return np.arange(len(self), dtype="intp")
# different closed or incompatible subtype -> no matches
common_subtype = find_common_type(
[self.dtype.subtype, target.dtype.subtype]
)
if self.closed != target.closed or is_object_dtype(common_subtype):
return np.repeat(np.intp(-1), len(target))
# non-overlapping -> at most one match per interval in target
# want exact matches -> need both left/right to match, so defer to
# left/right get_indexer, compare elementwise, equality -> match
left_indexer = self.left.get_indexer(target.left)
right_indexer = self.right.get_indexer(target.right)
indexer = np.where(left_indexer == right_indexer, left_indexer, -1)
elif not is_object_dtype(target):
# homogeneous scalar index: use IntervalTree
target = self._maybe_convert_i8(target)
indexer = self._engine.get_indexer(target.values)
else:
# heterogeneous scalar index: defer elementwise to get_loc
# (non-overlapping so get_loc guarantees scalar of KeyError)
indexer = []
for key in target:
try:
loc = self.get_loc(key)
except KeyError:
loc = -1
indexer.append(loc)
return ensure_platform_int(indexer)
@Appender(_index_shared_docs["get_indexer_non_unique"] % _index_doc_kwargs)
def get_indexer_non_unique(
self, target: AnyArrayLike
) -> Tuple[np.ndarray, np.ndarray]:
target = ensure_index(target)
# check that target IntervalIndex is compatible
if isinstance(target, IntervalIndex):
common_subtype = find_common_type(
[self.dtype.subtype, target.dtype.subtype]
)
if self.closed != target.closed or is_object_dtype(common_subtype):
# different closed or incompatible subtype -> no matches
return np.repeat(-1, len(target)), np.arange(len(target))
if is_object_dtype(target) or isinstance(target, IntervalIndex):
# target might contain intervals: defer elementwise to get_loc
indexer, missing = [], []
for i, key in enumerate(target):
try:
locs = self.get_loc(key)
if isinstance(locs, slice):
locs = np.arange(locs.start, locs.stop, locs.step, dtype="intp")
locs = np.array(locs, ndmin=1)
except KeyError:
missing.append(i)
locs = np.array([-1])
indexer.append(locs)
indexer = np.concatenate(indexer)
else:
target = self._maybe_convert_i8(target)
indexer, missing = self._engine.get_indexer_non_unique(target.values)
return ensure_platform_int(indexer), ensure_platform_int(missing)
def get_indexer_for(self, target: AnyArrayLike, **kwargs) -> np.ndarray:
"""
Guaranteed return of an indexer even when overlapping.
This dispatches to get_indexer or get_indexer_non_unique
as appropriate.
Returns
-------
numpy.ndarray
List of indices.
"""
if self.is_overlapping:
return self.get_indexer_non_unique(target, **kwargs)[0]
return self.get_indexer(target, **kwargs)
@Appender(_index_shared_docs["get_value"] % _index_doc_kwargs)
def get_value(self, series: ABCSeries, key: Any) -> Any:
if com.is_bool_indexer(key):
loc = key
elif is_list_like(key):
if self.is_overlapping:
loc, missing = self.get_indexer_non_unique(key)
if len(missing):
raise KeyError
else:
loc = self.get_indexer(key)
elif isinstance(key, slice):
if not (key.step is None or key.step == 1):
raise ValueError("cannot support not-default step in a slice")
loc = self._convert_slice_indexer(key, kind="getitem")
else:
loc = self.get_loc(key)
return series.iloc[loc]
@Appender(_index_shared_docs["where"])
def where(self, cond, other=None):
if other is None:
other = self._na_value
values = np.where(cond, self.values, other)
return self._shallow_copy(values)
def delete(self, loc):
"""
Return a new IntervalIndex with passed location(-s) deleted
Returns
-------
new_index : IntervalIndex
"""
new_left = self.left.delete(loc)
new_right = self.right.delete(loc)
return self._shallow_copy(new_left, new_right)
def insert(self, loc, item):
"""
Return a new IntervalIndex inserting new item at location. Follows
Python list.append semantics for negative values. Only Interval
objects and NA can be inserted into an IntervalIndex
Parameters
----------
loc : int
item : object
Returns
-------
new_index : IntervalIndex
"""
if isinstance(item, Interval):
if item.closed != self.closed:
raise ValueError(
"inserted item must be closed on the same " "side as the index"
)
left_insert = item.left
right_insert = item.right
elif is_scalar(item) and isna(item):
# GH 18295
left_insert = right_insert = item
else:
raise ValueError(
"can only insert Interval objects and NA into " "an IntervalIndex"
)
new_left = self.left.insert(loc, left_insert)
new_right = self.right.insert(loc, right_insert)
return self._shallow_copy(new_left, new_right)
def _concat_same_dtype(self, to_concat, name):
"""
assert that we all have the same .closed
we allow a 0-len index here as well
"""
if not len({i.closed for i in to_concat if len(i)}) == 1:
msg = (
"can only append two IntervalIndex objects "
"that are closed on the same side"
)
raise ValueError(msg)
return super()._concat_same_dtype(to_concat, name)
@Appender(_index_shared_docs["take"] % _index_doc_kwargs)
def take(self, indices, axis=0, allow_fill=True, fill_value=None, **kwargs):
result = self._data.take(
indices, axis=axis, allow_fill=allow_fill, fill_value=fill_value, **kwargs
)
attributes = self._get_attributes_dict()
return self._simple_new(result, **attributes)
def __getitem__(self, value):
result = self._data[value]
if isinstance(result, IntervalArray):
return self._shallow_copy(result)
else:
# scalar
return result
# --------------------------------------------------------------------
# Rendering Methods
# __repr__ associated methods are based on MultiIndex
def _format_with_header(self, header, **kwargs):
return header + list(self._format_native_types(**kwargs))
def _format_native_types(self, na_rep="NaN", quoting=None, **kwargs):
# GH 28210: use base method but with different default na_rep
return super()._format_native_types(na_rep=na_rep, quoting=quoting, **kwargs)
def _format_data(self, name=None):
# TODO: integrate with categorical and make generic
# name argument is unused here; just for compat with base / categorical
n = len(self)
max_seq_items = min((get_option("display.max_seq_items") or n) // 10, 10)
formatter = str
if n == 0:
summary = "[]"
elif n == 1:
first = formatter(self[0])
summary = "[{first}]".format(first=first)
elif n == 2:
first = formatter(self[0])
last = formatter(self[-1])
summary = "[{first}, {last}]".format(first=first, last=last)
else:
if n > max_seq_items:
n = min(max_seq_items // 2, 10)
head = [formatter(x) for x in self[:n]]
tail = [formatter(x) for x in self[-n:]]
summary = "[{head} ... {tail}]".format(
head=", ".join(head), tail=", ".join(tail)
)
else:
tail = [formatter(x) for x in self]
summary = "[{tail}]".format(tail=", ".join(tail))
return summary + "," + self._format_space()
def _format_attrs(self):
attrs = [("closed", repr(self.closed))]
if self.name is not None:
attrs.append(("name", default_pprint(self.name)))
attrs.append(("dtype", "'{dtype}'".format(dtype=self.dtype)))
return attrs
def _format_space(self):
space = " " * (len(self.__class__.__name__) + 1)
return "\n{space}".format(space=space)
# --------------------------------------------------------------------
def argsort(self, *args, **kwargs):
return np.lexsort((self.right, self.left))
def equals(self, other):
"""
Determines if two IntervalIndex objects contain the same elements
"""
if self.is_(other):
return True
# if we can coerce to an II
# then we can compare
if not isinstance(other, IntervalIndex):
if not is_interval_dtype(other):
return False
other = Index(getattr(other, ".values", other))
return (
self.left.equals(other.left)
and self.right.equals(other.right)
and self.closed == other.closed
)
@Appender(_interval_shared_docs["contains"] % _index_doc_kwargs)
def contains(self, other):
return self._data.contains(other)
@Appender(_interval_shared_docs["overlaps"] % _index_doc_kwargs)
def overlaps(self, other):
return self._data.overlaps(other)
@Appender(_index_shared_docs["intersection"])
@SetopCheck(op_name="intersection")
def intersection(
self, other: "IntervalIndex", sort: bool = False
) -> "IntervalIndex":
if self.left.is_unique and self.right.is_unique:
taken = self._intersection_unique(other)
elif other.left.is_unique and other.right.is_unique and self.isna().sum() <= 1:
# Swap other/self if other is unique and self does not have
# multiple NaNs
taken = other._intersection_unique(self)
else:
# duplicates
taken = self._intersection_non_unique(other)
if sort is None:
taken = taken.sort_values()
return taken
def _intersection_unique(self, other: "IntervalIndex") -> "IntervalIndex":
"""
Used when the IntervalIndex does not have any common endpoint,
no mater left or right.
Return the intersection with another IntervalIndex.
Parameters
----------
other : IntervalIndex
Returns
-------
taken : IntervalIndex
"""
lindexer = self.left.get_indexer(other.left)
rindexer = self.right.get_indexer(other.right)
match = (lindexer == rindexer) & (lindexer != -1)
indexer = lindexer.take(match.nonzero()[0])
return self.take(indexer)
def _intersection_non_unique(self, other: "IntervalIndex") -> "IntervalIndex":
"""
Used when the IntervalIndex does have some common endpoints,
on either sides.
Return the intersection with another IntervalIndex.
Parameters
----------
other : IntervalIndex
Returns
-------
taken : IntervalIndex
"""
mask = np.zeros(len(self), dtype=bool)
if self.hasnans and other.hasnans:
first_nan_loc = np.arange(len(self))[self.isna()][0]
mask[first_nan_loc] = True
lmiss = other.left.get_indexer_non_unique(self.left)[1]
lmatch = np.setdiff1d(np.arange(len(self)), lmiss)
for i in lmatch:
potential = other.left.get_loc(self.left[i])
if is_scalar(potential):
if self.right[i] == other.right[potential]:
mask[i] = True
elif self.right[i] in other.right[potential]:
mask[i] = True
return self[mask]
def _setop(op_name, sort=None):
@SetopCheck(op_name=op_name)
def func(self, other, sort=sort):
result = getattr(self._multiindex, op_name)(other._multiindex, sort=sort)
result_name = get_op_result_name(self, other)
# GH 19101: ensure empty results have correct dtype
if result.empty:
result = result.values.astype(self.dtype.subtype)
else:
result = result.values
return type(self).from_tuples(result, closed=self.closed, name=result_name)
return func
@property
def is_all_dates(self):
"""
This is False even when left/right contain datetime-like objects,
as the check is done on the Interval itself
"""
return False
union = _setop("union")
difference = _setop("difference")
symmetric_difference = _setop("symmetric_difference")
# TODO: arithmetic operations
IntervalIndex._add_logical_methods_disabled()
def _is_valid_endpoint(endpoint):
"""helper for interval_range to check if start/end are valid types"""
return any(
[
is_number(endpoint),
isinstance(endpoint, Timestamp),
isinstance(endpoint, Timedelta),
endpoint is None,
]
)
def _is_type_compatible(a, b):
"""helper for interval_range to check type compat of start/end/freq"""
is_ts_compat = lambda x: isinstance(x, (Timestamp, DateOffset))
is_td_compat = lambda x: isinstance(x, (Timedelta, DateOffset))
return (
(is_number(a) and is_number(b))
or (is_ts_compat(a) and is_ts_compat(b))
or (is_td_compat(a) and is_td_compat(b))
or com._any_none(a, b)
)
def interval_range(
start=None, end=None, periods=None, freq=None, name=None, closed="right"
):
"""
Return a fixed frequency IntervalIndex
Parameters
----------
start : numeric or datetime-like, default None
Left bound for generating intervals
end : numeric or datetime-like, default None
Right bound for generating intervals
periods : integer, default None
Number of periods to generate
freq : numeric, string, or DateOffset, default None
The length of each interval. Must be consistent with the type of start
and end, e.g. 2 for numeric, or '5H' for datetime-like. Default is 1
for numeric and 'D' for datetime-like.
name : string, default None
Name of the resulting IntervalIndex
closed : {'left', 'right', 'both', 'neither'}, default 'right'
Whether the intervals are closed on the left-side, right-side, both
or neither.
Returns
-------
rng : IntervalIndex
See Also
--------
IntervalIndex : An Index of intervals that are all closed on the same side.
Notes
-----
Of the four parameters ``start``, ``end``, ``periods``, and ``freq``,
exactly three must be specified. If ``freq`` is omitted, the resulting
``IntervalIndex`` will have ``periods`` linearly spaced elements between
``start`` and ``end``, inclusively.
To learn more about datetime-like frequency strings, please see `this link
<http://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases>`__.
Examples
--------
Numeric ``start`` and ``end`` is supported.
>>> pd.interval_range(start=0, end=5)
IntervalIndex([(0, 1], (1, 2], (2, 3], (3, 4], (4, 5]],
closed='right', dtype='interval[int64]')
Additionally, datetime-like input is also supported.
>>> pd.interval_range(start=pd.Timestamp('2017-01-01'),
... end=pd.Timestamp('2017-01-04'))
IntervalIndex([(2017-01-01, 2017-01-02], (2017-01-02, 2017-01-03],
(2017-01-03, 2017-01-04]],
closed='right', dtype='interval[datetime64[ns]]')
The ``freq`` parameter specifies the frequency between the left and right.
endpoints of the individual intervals within the ``IntervalIndex``. For
numeric ``start`` and ``end``, the frequency must also be numeric.
>>> pd.interval_range(start=0, periods=4, freq=1.5)
IntervalIndex([(0.0, 1.5], (1.5, 3.0], (3.0, 4.5], (4.5, 6.0]],
closed='right', dtype='interval[float64]')
Similarly, for datetime-like ``start`` and ``end``, the frequency must be
convertible to a DateOffset.
>>> pd.interval_range(start=pd.Timestamp('2017-01-01'),
... periods=3, freq='MS')
IntervalIndex([(2017-01-01, 2017-02-01], (2017-02-01, 2017-03-01],
(2017-03-01, 2017-04-01]],
closed='right', dtype='interval[datetime64[ns]]')
Specify ``start``, ``end``, and ``periods``; the frequency is generated
automatically (linearly spaced).
>>> pd.interval_range(start=0, end=6, periods=4)
IntervalIndex([(0.0, 1.5], (1.5, 3.0], (3.0, 4.5], (4.5, 6.0]],
closed='right',
dtype='interval[float64]')
The ``closed`` parameter specifies which endpoints of the individual
intervals within the ``IntervalIndex`` are closed.
>>> pd.interval_range(end=5, periods=4, closed='both')
IntervalIndex([[1, 2], [2, 3], [3, 4], [4, 5]],
closed='both', dtype='interval[int64]')
"""
start = com.maybe_box_datetimelike(start)
end = com.maybe_box_datetimelike(end)
endpoint = start if start is not None else end
if freq is None and com._any_none(periods, start, end):
freq = 1 if is_number(endpoint) else "D"
if com.count_not_none(start, end, periods, freq) != 3:
raise ValueError(
"Of the four parameters: start, end, periods, and "
"freq, exactly three must be specified"
)
if not _is_valid_endpoint(start):
msg = "start must be numeric or datetime-like, got {start}"
raise ValueError(msg.format(start=start))
elif not _is_valid_endpoint(end):
msg = "end must be numeric or datetime-like, got {end}"
raise ValueError(msg.format(end=end))
if is_float(periods):
periods = int(periods)
elif not is_integer(periods) and periods is not None:
msg = "periods must be a number, got {periods}"
raise TypeError(msg.format(periods=periods))
if freq is not None and not is_number(freq):
try:
freq = to_offset(freq)
except ValueError:
raise ValueError(
"freq must be numeric or convertible to "
"DateOffset, got {freq}".format(freq=freq)
)
# verify type compatibility
if not all(
[
_is_type_compatible(start, end),
_is_type_compatible(start, freq),
_is_type_compatible(end, freq),
]
):
raise TypeError("start, end, freq need to be type compatible")
# +1 to convert interval count to breaks count (n breaks = n-1 intervals)
if periods is not None:
periods += 1
if is_number(endpoint):
# force consistency between start/end/freq (lower end if freq skips it)
if com._all_not_none(start, end, freq):
end -= (end - start) % freq
# compute the period/start/end if unspecified (at most one)
if periods is None:
periods = int((end - start) // freq) + 1
elif start is None:
start = end - (periods - 1) * freq
elif end is None:
end = start + (periods - 1) * freq
breaks = np.linspace(start, end, periods)
if all(is_integer(x) for x in com._not_none(start, end, freq)):
# np.linspace always produces float output
breaks = maybe_downcast_to_dtype(breaks, "int64")
else:
# delegate to the appropriate range function
if isinstance(endpoint, Timestamp):
range_func = date_range
else:
range_func = timedelta_range
breaks = range_func(start=start, end=end, periods=periods, freq=freq)
return IntervalIndex.from_breaks(breaks, name=name, closed=closed)