Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

/ core / arrays / _ranges.py

# -*- coding: utf-8 -*-
"""
Helper functions to generate range-like data for DatetimeArray
(and possibly TimedeltaArray/PeriodArray)
"""

import numpy as np

from pandas._libs.tslibs import OutOfBoundsDatetime, Timestamp

from pandas.tseries.offsets import Tick, generate_range


def generate_regular_range(start, end, periods, freq):
    """
    Generate a range of dates with the spans between dates described by
    the given `freq` DateOffset.

    Parameters
    ----------
    start : Timestamp or None
        first point of produced date range
    end : Timestamp or None
        last point of produced date range
    periods : int
        number of periods in produced date range
    freq : DateOffset
        describes space between dates in produced date range

    Returns
    -------
    ndarray[np.int64] representing nanosecond unix timestamps
    """
    if isinstance(freq, Tick):
        stride = freq.nanos
        if periods is None:
            b = Timestamp(start).value
            # cannot just use e = Timestamp(end) + 1 because arange breaks when
            # stride is too large, see GH10887
            e = (b + (Timestamp(end).value - b) // stride * stride +
                 stride // 2 + 1)
            # end.tz == start.tz by this point due to _generate implementation
            tz = start.tz
        elif start is not None:
            b = Timestamp(start).value
            e = _generate_range_overflow_safe(b, periods, stride, side='start')
            tz = start.tz
        elif end is not None:
            e = Timestamp(end).value + stride
            b = _generate_range_overflow_safe(e, periods, stride, side='end')
            tz = end.tz
        else:
            raise ValueError("at least 'start' or 'end' should be specified "
                             "if a 'period' is given.")

        with np.errstate(over="raise"):
            # If the range is sufficiently large, np.arange may overflow
            #  and incorrectly return an empty array if not caught.
            try:
                values = np.arange(b, e, stride, dtype=np.int64)
            except FloatingPointError:
                xdr = [b]
                while xdr[-1] != e:
                    xdr.append(xdr[-1] + stride)
                values = np.array(xdr[:-1], dtype=np.int64)

    else:
        tz = None
        # start and end should have the same timezone by this point
        if start is not None:
            tz = start.tz
        elif end is not None:
            tz = end.tz

        xdr = generate_range(start=start, end=end,
                             periods=periods, offset=freq)

        values = np.array([x.value for x in xdr], dtype=np.int64)

    return values, tz


def _generate_range_overflow_safe(endpoint, periods, stride, side='start'):
    """
    Calculate the second endpoint for passing to np.arange, checking
    to avoid an integer overflow.  Catch OverflowError and re-raise
    as OutOfBoundsDatetime.

    Parameters
    ----------
    endpoint : int
        nanosecond timestamp of the known endpoint of the desired range
    periods : int
        number of periods in the desired range
    stride : int
        nanoseconds between periods in the desired range
    side : {'start', 'end'}
        which end of the range `endpoint` refers to

    Returns
    -------
    other_end : int

    Raises
    ------
    OutOfBoundsDatetime
    """
    # GH#14187 raise instead of incorrectly wrapping around
    assert side in ['start', 'end']

    i64max = np.uint64(np.iinfo(np.int64).max)
    msg = ('Cannot generate range with {side}={endpoint} and '
           'periods={periods}'
           .format(side=side, endpoint=endpoint, periods=periods))

    with np.errstate(over="raise"):
        # if periods * strides cannot be multiplied within the *uint64* bounds,
        #  we cannot salvage the operation by recursing, so raise
        try:
            addend = np.uint64(periods) * np.uint64(np.abs(stride))
        except FloatingPointError:
            raise OutOfBoundsDatetime(msg)

    if np.abs(addend) <= i64max:
        # relatively easy case without casting concerns
        return _generate_range_overflow_safe_signed(
            endpoint, periods, stride, side)

    elif ((endpoint > 0 and side == 'start' and stride > 0) or
          (endpoint < 0 and side == 'end' and stride > 0)):
        # no chance of not-overflowing
        raise OutOfBoundsDatetime(msg)

    elif (side == 'end' and endpoint > i64max and endpoint - stride <= i64max):
        # in _generate_regular_range we added `stride` thereby overflowing
        #  the bounds.  Adjust to fix this.
        return _generate_range_overflow_safe(endpoint - stride,
                                             periods - 1, stride, side)

    # split into smaller pieces
    mid_periods = periods // 2
    remaining = periods - mid_periods
    assert 0 < remaining < periods, (remaining, periods, endpoint, stride)

    midpoint = _generate_range_overflow_safe(endpoint, mid_periods,
                                             stride, side)
    return _generate_range_overflow_safe(midpoint, remaining, stride, side)


def _generate_range_overflow_safe_signed(endpoint, periods, stride, side):
    """
    A special case for _generate_range_overflow_safe where `periods * stride`
    can be calculated without overflowing int64 bounds.
    """
    assert side in ['start', 'end']
    if side == 'end':
        stride *= -1

    with np.errstate(over="raise"):
        addend = np.int64(periods) * np.int64(stride)
        try:
            # easy case with no overflows
            return np.int64(endpoint) + addend
        except (FloatingPointError, OverflowError):
            # with endpoint negative and addend positive we risk
            #  FloatingPointError; with reversed signed we risk OverflowError
            pass

        # if stride and endpoint had opposite signs, then endpoint + addend
        #  should never overflow.  so they must have the same signs
        assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)

        if stride > 0:
            # watch out for very special case in which we just slightly
            #  exceed implementation bounds, but when passing the result to
            #  np.arange will get a result slightly within the bounds
            assert endpoint >= 0
            result = np.uint64(endpoint) + np.uint64(addend)
            i64max = np.uint64(np.iinfo(np.int64).max)
            assert result > i64max
            if result <= i64max + np.uint64(stride):
                return result

    raise OutOfBoundsDatetime('Cannot generate range with '
                              '{side}={endpoint} and '
                              'periods={periods}'
                              .format(side=side, endpoint=endpoint,
                                      periods=periods))