# -*- coding: utf-8 -*-
from __future__ import division
from datetime import timedelta
import textwrap
import warnings
import numpy as np
from pandas._libs import lib, tslibs
from pandas._libs.tslibs import NaT, Timedelta, Timestamp, iNaT
from pandas._libs.tslibs.fields import get_timedelta_field
from pandas._libs.tslibs.timedeltas import (
array_to_timedelta64, parse_timedelta_unit, precision_from_unit)
import pandas.compat as compat
from pandas.util._decorators import Appender
from pandas.core.dtypes.common import (
_NS_DTYPE, _TD_DTYPE, ensure_int64, is_datetime64_dtype, is_dtype_equal,
is_float_dtype, is_integer_dtype, is_list_like, is_object_dtype, is_scalar,
is_string_dtype, is_timedelta64_dtype, is_timedelta64_ns_dtype,
pandas_dtype)
from pandas.core.dtypes.dtypes import DatetimeTZDtype
from pandas.core.dtypes.generic import (
ABCDataFrame, ABCIndexClass, ABCSeries, ABCTimedeltaIndex)
from pandas.core.dtypes.missing import isna
from pandas.core import ops
from pandas.core.algorithms import checked_add_with_arr
import pandas.core.common as com
from pandas.tseries.frequencies import to_offset
from pandas.tseries.offsets import Tick
from . import datetimelike as dtl
_BAD_DTYPE = "dtype {dtype} cannot be converted to timedelta64[ns]"
def _is_convertible_to_td(key):
return isinstance(key, (Tick, timedelta,
np.timedelta64, compat.string_types))
def _field_accessor(name, alias, docstring=None):
def f(self):
values = self.asi8
result = get_timedelta_field(values, alias)
if self._hasnans:
result = self._maybe_mask_results(result, fill_value=None,
convert='float64')
return result
f.__name__ = name
f.__doc__ = "\n{}\n".format(docstring)
return property(f)
def _td_array_cmp(cls, op):
"""
Wrap comparison operations to convert timedelta-like to timedelta64
"""
opname = '__{name}__'.format(name=op.__name__)
nat_result = True if opname == '__ne__' else False
def wrapper(self, other):
if isinstance(other, (ABCDataFrame, ABCSeries, ABCIndexClass)):
return NotImplemented
if _is_convertible_to_td(other) or other is NaT:
try:
other = Timedelta(other)
except ValueError:
# failed to parse as timedelta
return ops.invalid_comparison(self, other, op)
result = op(self.view('i8'), other.value)
if isna(other):
result.fill(nat_result)
elif not is_list_like(other):
return ops.invalid_comparison(self, other, op)
elif len(other) != len(self):
raise ValueError("Lengths must match")
else:
try:
other = type(self)._from_sequence(other)._data
except (ValueError, TypeError):
return ops.invalid_comparison(self, other, op)
result = op(self.view('i8'), other.view('i8'))
result = com.values_from_object(result)
o_mask = np.array(isna(other))
if o_mask.any():
result[o_mask] = nat_result
if self._hasnans:
result[self._isnan] = nat_result
return result
return compat.set_function_name(wrapper, opname, cls)
class TimedeltaArray(dtl.DatetimeLikeArrayMixin, dtl.TimelikeOps):
"""
Pandas ExtensionArray for timedelta data.
.. versionadded:: 0.24.0
.. warning::
TimedeltaArray is currently experimental, and its API may change
without warning. In particular, :attr:`TimedeltaArray.dtype` is
expected to change to be an instance of an ``ExtensionDtype``
subclass.
Parameters
----------
values : array-like
The timedelta data.
dtype : numpy.dtype
Currently, only ``numpy.dtype("timedelta64[ns]")`` is accepted.
freq : Offset, optional
copy : bool, default False
Whether to copy the underlying array of data.
"""
_typ = "timedeltaarray"
_scalar_type = Timedelta
__array_priority__ = 1000
# define my properties & methods for delegation
_other_ops = []
_bool_ops = []
_object_ops = ['freq']
_field_ops = ['days', 'seconds', 'microseconds', 'nanoseconds']
_datetimelike_ops = _field_ops + _object_ops + _bool_ops
_datetimelike_methods = ["to_pytimedelta", "total_seconds",
"round", "floor", "ceil"]
# Needed so that NaT.__richcmp__(DateTimeArray) operates pointwise
ndim = 1
@property
def _box_func(self):
return lambda x: Timedelta(x, unit='ns')
@property
def dtype(self):
"""
The dtype for the TimedeltaArray.
.. warning::
A future version of pandas will change dtype to be an instance
of a :class:`pandas.api.extensions.ExtensionDtype` subclass,
not a ``numpy.dtype``.
Returns
-------
numpy.dtype
"""
return _TD_DTYPE
# ----------------------------------------------------------------
# Constructors
_attributes = ["freq"]
def __init__(self, values, dtype=_TD_DTYPE, freq=None, copy=False):
if isinstance(values, (ABCSeries, ABCIndexClass)):
values = values._values
inferred_freq = getattr(values, "_freq", None)
if isinstance(values, type(self)):
if freq is None:
freq = values.freq
elif freq and values.freq:
freq = to_offset(freq)
freq, _ = dtl.validate_inferred_freq(freq, values.freq, False)
values = values._data
if not isinstance(values, np.ndarray):
msg = (
"Unexpected type '{}'. 'values' must be a TimedeltaArray "
"ndarray, or Series or Index containing one of those."
)
raise ValueError(msg.format(type(values).__name__))
if values.ndim != 1:
raise ValueError("Only 1-dimensional input arrays are supported.")
if values.dtype == 'i8':
# for compat with datetime/timedelta/period shared methods,
# we can sometimes get here with int64 values. These represent
# nanosecond UTC (or tz-naive) unix timestamps
values = values.view(_TD_DTYPE)
_validate_td64_dtype(values.dtype)
dtype = _validate_td64_dtype(dtype)
if freq == "infer":
msg = (
"Frequency inference not allowed in TimedeltaArray.__init__. "
"Use 'pd.array()' instead."
)
raise ValueError(msg)
if copy:
values = values.copy()
if freq:
freq = to_offset(freq)
self._data = values
self._dtype = dtype
self._freq = freq
if inferred_freq is None and freq is not None:
type(self)._validate_frequency(self, freq)
@classmethod
def _simple_new(cls, values, freq=None, dtype=_TD_DTYPE):
assert dtype == _TD_DTYPE, dtype
assert isinstance(values, np.ndarray), type(values)
result = object.__new__(cls)
result._data = values.view(_TD_DTYPE)
result._freq = to_offset(freq)
result._dtype = _TD_DTYPE
return result
@classmethod
def _from_sequence(cls, data, dtype=_TD_DTYPE, copy=False,
freq=None, unit=None):
if dtype:
_validate_td64_dtype(dtype)
freq, freq_infer = dtl.maybe_infer_freq(freq)
data, inferred_freq = sequence_to_td64ns(data, copy=copy, unit=unit)
freq, freq_infer = dtl.validate_inferred_freq(freq, inferred_freq,
freq_infer)
result = cls._simple_new(data, freq=freq)
if inferred_freq is None and freq is not None:
# this condition precludes `freq_infer`
cls._validate_frequency(result, freq)
elif freq_infer:
# Set _freq directly to bypass duplicative _validate_frequency
# check.
result._freq = to_offset(result.inferred_freq)
return result
@classmethod
def _generate_range(cls, start, end, periods, freq, closed=None):
periods = dtl.validate_periods(periods)
if freq is None and any(x is None for x in [periods, start, end]):
raise ValueError('Must provide freq argument if no data is '
'supplied')
if com.count_not_none(start, end, periods, freq) != 3:
raise ValueError('Of the four parameters: start, end, periods, '
'and freq, exactly three must be specified')
if start is not None:
start = Timedelta(start)
if end is not None:
end = Timedelta(end)
if start is None and end is None:
if closed is not None:
raise ValueError("Closed has to be None if not both of start"
"and end are defined")
left_closed, right_closed = dtl.validate_endpoints(closed)
if freq is not None:
index = _generate_regular_range(start, end, periods, freq)
else:
index = np.linspace(start.value, end.value, periods).astype('i8')
if not left_closed:
index = index[1:]
if not right_closed:
index = index[:-1]
return cls._simple_new(index, freq=freq)
# ----------------------------------------------------------------
# DatetimeLike Interface
def _unbox_scalar(self, value):
if not isinstance(value, self._scalar_type) and value is not NaT:
raise ValueError("'value' should be a Timedelta.")
self._check_compatible_with(value)
return value.value
def _scalar_from_string(self, value):
return Timedelta(value)
def _check_compatible_with(self, other):
# we don't have anything to validate.
pass
def _maybe_clear_freq(self):
self._freq = None
# ----------------------------------------------------------------
# Array-Like / EA-Interface Methods
@Appender(dtl.DatetimeLikeArrayMixin._validate_fill_value.__doc__)
def _validate_fill_value(self, fill_value):
if isna(fill_value):
fill_value = iNaT
elif isinstance(fill_value, (timedelta, np.timedelta64, Tick)):
fill_value = Timedelta(fill_value).value
else:
raise ValueError("'fill_value' should be a Timedelta. "
"Got '{got}'.".format(got=fill_value))
return fill_value
def astype(self, dtype, copy=True):
# We handle
# --> timedelta64[ns]
# --> timedelta64
# DatetimeLikeArrayMixin super call handles other cases
dtype = pandas_dtype(dtype)
if is_timedelta64_dtype(dtype) and not is_timedelta64_ns_dtype(dtype):
# by pandas convention, converting to non-nano timedelta64
# returns an int64-dtyped array with ints representing multiples
# of the desired timedelta unit. This is essentially division
if self._hasnans:
# avoid double-copying
result = self._data.astype(dtype, copy=False)
values = self._maybe_mask_results(result,
fill_value=None,
convert='float64')
Loading ...