Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ core / arrays / _ranges.py

"""
Helper functions to generate range-like data for DatetimeArray
(and possibly TimedeltaArray/PeriodArray)
"""

from typing import Tuple

import numpy as np

from pandas._libs.tslibs import OutOfBoundsDatetime, Timestamp

from pandas.tseries.offsets import DateOffset, Tick, generate_range


def generate_regular_range(
    start: Timestamp, end: Timestamp, periods: int, freq: DateOffset
) -> Tuple[np.ndarray, str]:
    """
    Generate a range of dates with the spans between dates described by
    the given `freq` DateOffset.

    Parameters
    ----------
    start : Timestamp or None
        first point of produced date range
    end : Timestamp or None
        last point of produced date range
    periods : int
        number of periods in produced date range
    freq : DateOffset
        describes space between dates in produced date range

    Returns
    -------
    ndarray[np.int64] representing nanosecond unix timestamps
    """
    if isinstance(freq, Tick):
        stride = freq.nanos
        if periods is None:
            b = Timestamp(start).value
            # cannot just use e = Timestamp(end) + 1 because arange breaks when
            # stride is too large, see GH10887
            e = b + (Timestamp(end).value - b) // stride * stride + stride // 2 + 1
            # end.tz == start.tz by this point due to _generate implementation
            tz = start.tz
        elif start is not None:
            b = Timestamp(start).value
            e = _generate_range_overflow_safe(b, periods, stride, side="start")
            tz = start.tz
        elif end is not None:
            e = Timestamp(end).value + stride
            b = _generate_range_overflow_safe(e, periods, stride, side="end")
            tz = end.tz
        else:
            raise ValueError(
                "at least 'start' or 'end' should be specified "
                "if a 'period' is given."
            )

        with np.errstate(over="raise"):
            # If the range is sufficiently large, np.arange may overflow
            #  and incorrectly return an empty array if not caught.
            try:
                values = np.arange(b, e, stride, dtype=np.int64)
            except FloatingPointError:
                xdr = [b]
                while xdr[-1] != e:
                    xdr.append(xdr[-1] + stride)
                values = np.array(xdr[:-1], dtype=np.int64)

    else:
        tz = None
        # start and end should have the same timezone by this point
        if start is not None:
            tz = start.tz
        elif end is not None:
            tz = end.tz

        xdr = generate_range(start=start, end=end, periods=periods, offset=freq)

        values = np.array([x.value for x in xdr], dtype=np.int64)

    return values, tz


def _generate_range_overflow_safe(
    endpoint: int, periods: int, stride: int, side: str = "start"
) -> int:
    """
    Calculate the second endpoint for passing to np.arange, checking
    to avoid an integer overflow.  Catch OverflowError and re-raise
    as OutOfBoundsDatetime.

    Parameters
    ----------
    endpoint : int
        nanosecond timestamp of the known endpoint of the desired range
    periods : int
        number of periods in the desired range
    stride : int
        nanoseconds between periods in the desired range
    side : {'start', 'end'}
        which end of the range `endpoint` refers to

    Returns
    -------
    other_end : int

    Raises
    ------
    OutOfBoundsDatetime
    """
    # GH#14187 raise instead of incorrectly wrapping around
    assert side in ["start", "end"]

    i64max = np.uint64(np.iinfo(np.int64).max)
    msg = (
        "Cannot generate range with {side}={endpoint} and "
        "periods={periods}".format(side=side, endpoint=endpoint, periods=periods)
    )

    with np.errstate(over="raise"):
        # if periods * strides cannot be multiplied within the *uint64* bounds,
        #  we cannot salvage the operation by recursing, so raise
        try:
            addend = np.uint64(periods) * np.uint64(np.abs(stride))
        except FloatingPointError:
            raise OutOfBoundsDatetime(msg)

    if np.abs(addend) <= i64max:
        # relatively easy case without casting concerns
        return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)

    elif (endpoint > 0 and side == "start" and stride > 0) or (
        endpoint < 0 and side == "end" and stride > 0
    ):
        # no chance of not-overflowing
        raise OutOfBoundsDatetime(msg)

    elif side == "end" and endpoint > i64max and endpoint - stride <= i64max:
        # in _generate_regular_range we added `stride` thereby overflowing
        #  the bounds.  Adjust to fix this.
        return _generate_range_overflow_safe(
            endpoint - stride, periods - 1, stride, side
        )

    # split into smaller pieces
    mid_periods = periods // 2
    remaining = periods - mid_periods
    assert 0 < remaining < periods, (remaining, periods, endpoint, stride)

    midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side)
    return _generate_range_overflow_safe(midpoint, remaining, stride, side)


def _generate_range_overflow_safe_signed(
    endpoint: int, periods: int, stride: int, side: str
) -> int:
    """
    A special case for _generate_range_overflow_safe where `periods * stride`
    can be calculated without overflowing int64 bounds.
    """
    assert side in ["start", "end"]
    if side == "end":
        stride *= -1

    with np.errstate(over="raise"):
        addend = np.int64(periods) * np.int64(stride)
        try:
            # easy case with no overflows
            return np.int64(endpoint) + addend
        except (FloatingPointError, OverflowError):
            # with endpoint negative and addend positive we risk
            #  FloatingPointError; with reversed signed we risk OverflowError
            pass

        # if stride and endpoint had opposite signs, then endpoint + addend
        #  should never overflow.  so they must have the same signs
        assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)

        if stride > 0:
            # watch out for very special case in which we just slightly
            #  exceed implementation bounds, but when passing the result to
            #  np.arange will get a result slightly within the bounds
            assert endpoint >= 0
            result = np.uint64(endpoint) + np.uint64(addend)
            i64max = np.uint64(np.iinfo(np.int64).max)
            assert result > i64max
            if result <= i64max + np.uint64(stride):
                return result

    raise OutOfBoundsDatetime(
        "Cannot generate range with "
        "{side}={endpoint} and "
        "periods={periods}".format(side=side, endpoint=endpoint, periods=periods)
    )