Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / pandas   python

Repository URL to install this package:

Version: 1.1.1 

/ tests / frame / test_block_internals.py

from datetime import datetime, timedelta
from io import StringIO
import itertools

import numpy as np
import pytest

import pandas as pd
from pandas import (
    Categorical,
    DataFrame,
    Series,
    Timestamp,
    compat,
    date_range,
    option_context,
)
import pandas._testing as tm
from pandas.core.arrays import IntervalArray, integer_array
from pandas.core.internals import ObjectBlock
from pandas.core.internals.blocks import IntBlock

# Segregated collection of methods that require the BlockManager internal data
# structure


class TestDataFrameBlockInternals:
    def test_setitem_invalidates_datetime_index_freq(self):
        # GH#24096 altering a datetime64tz column inplace invalidates the
        #  `freq` attribute on the underlying DatetimeIndex

        dti = date_range("20130101", periods=3, tz="US/Eastern")
        ts = dti[1]

        df = DataFrame({"B": dti})
        assert df["B"]._values.freq == "D"

        df.iloc[1, 0] = pd.NaT
        assert df["B"]._values.freq is None

        # check that the DatetimeIndex was not altered in place
        assert dti.freq == "D"
        assert dti[1] == ts

    def test_cast_internals(self, float_frame):
        casted = DataFrame(float_frame._mgr, dtype=int)
        expected = DataFrame(float_frame._series, dtype=int)
        tm.assert_frame_equal(casted, expected)

        casted = DataFrame(float_frame._mgr, dtype=np.int32)
        expected = DataFrame(float_frame._series, dtype=np.int32)
        tm.assert_frame_equal(casted, expected)

    def test_consolidate(self, float_frame):
        float_frame["E"] = 7.0
        consolidated = float_frame._consolidate()
        assert len(consolidated._mgr.blocks) == 1

        # Ensure copy, do I want this?
        recons = consolidated._consolidate()
        assert recons is not consolidated
        tm.assert_frame_equal(recons, consolidated)

        float_frame["F"] = 8.0
        assert len(float_frame._mgr.blocks) == 3

        return_value = float_frame._consolidate(inplace=True)
        assert return_value is None
        assert len(float_frame._mgr.blocks) == 1

    def test_consolidate_inplace(self, float_frame):
        frame = float_frame.copy()  # noqa

        # triggers in-place consolidation
        for letter in range(ord("A"), ord("Z")):
            float_frame[chr(letter)] = chr(letter)

    def test_values_consolidate(self, float_frame):
        float_frame["E"] = 7.0
        assert not float_frame._mgr.is_consolidated()
        _ = float_frame.values  # noqa
        assert float_frame._mgr.is_consolidated()

    def test_modify_values(self, float_frame):
        float_frame.values[5] = 5
        assert (float_frame.values[5] == 5).all()

        # unconsolidated
        float_frame["E"] = 7.0
        col = float_frame["E"]
        float_frame.values[6] = 6
        assert (float_frame.values[6] == 6).all()

        # check that item_cache was cleared
        assert float_frame["E"] is not col
        assert (col == 7).all()

    def test_boolean_set_uncons(self, float_frame):
        float_frame["E"] = 7.0

        expected = float_frame.values.copy()
        expected[expected > 1] = 2

        float_frame[float_frame > 1] = 2
        tm.assert_almost_equal(expected, float_frame.values)

    def test_values_numeric_cols(self, float_frame):
        float_frame["foo"] = "bar"

        values = float_frame[["A", "B", "C", "D"]].values
        assert values.dtype == np.float64

    def test_values_lcd(self, mixed_float_frame, mixed_int_frame):

        # mixed lcd
        values = mixed_float_frame[["A", "B", "C", "D"]].values
        assert values.dtype == np.float64

        values = mixed_float_frame[["A", "B", "C"]].values
        assert values.dtype == np.float32

        values = mixed_float_frame[["C"]].values
        assert values.dtype == np.float16

        # GH 10364
        # B uint64 forces float because there are other signed int types
        values = mixed_int_frame[["A", "B", "C", "D"]].values
        assert values.dtype == np.float64

        values = mixed_int_frame[["A", "D"]].values
        assert values.dtype == np.int64

        # B uint64 forces float because there are other signed int types
        values = mixed_int_frame[["A", "B", "C"]].values
        assert values.dtype == np.float64

        # as B and C are both unsigned, no forcing to float is needed
        values = mixed_int_frame[["B", "C"]].values
        assert values.dtype == np.uint64

        values = mixed_int_frame[["A", "C"]].values
        assert values.dtype == np.int32

        values = mixed_int_frame[["C", "D"]].values
        assert values.dtype == np.int64

        values = mixed_int_frame[["A"]].values
        assert values.dtype == np.int32

        values = mixed_int_frame[["C"]].values
        assert values.dtype == np.uint8

    def test_constructor_with_convert(self):
        # this is actually mostly a test of lib.maybe_convert_objects
        # #2845
        df = DataFrame({"A": [2 ** 63 - 1]})
        result = df["A"]
        expected = Series(np.asarray([2 ** 63 - 1], np.int64), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [2 ** 63]})
        result = df["A"]
        expected = Series(np.asarray([2 ** 63], np.uint64), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [datetime(2005, 1, 1), True]})
        result = df["A"]
        expected = Series(
            np.asarray([datetime(2005, 1, 1), True], np.object_), name="A"
        )
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [None, 1]})
        result = df["A"]
        expected = Series(np.asarray([np.nan, 1], np.float_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0, 2]})
        result = df["A"]
        expected = Series(np.asarray([1.0, 2], np.float_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0 + 2.0j, 3]})
        result = df["A"]
        expected = Series(np.asarray([1.0 + 2.0j, 3], np.complex_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0 + 2.0j, 3.0]})
        result = df["A"]
        expected = Series(np.asarray([1.0 + 2.0j, 3.0], np.complex_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0 + 2.0j, True]})
        result = df["A"]
        expected = Series(np.asarray([1.0 + 2.0j, True], np.object_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0, None]})
        result = df["A"]
        expected = Series(np.asarray([1.0, np.nan], np.float_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [1.0 + 2.0j, None]})
        result = df["A"]
        expected = Series(np.asarray([1.0 + 2.0j, np.nan], np.complex_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [2.0, 1, True, None]})
        result = df["A"]
        expected = Series(np.asarray([2.0, 1, True, None], np.object_), name="A")
        tm.assert_series_equal(result, expected)

        df = DataFrame({"A": [2.0, 1, datetime(2006, 1, 1), None]})
        result = df["A"]
        expected = Series(
            np.asarray([2.0, 1, datetime(2006, 1, 1), None], np.object_), name="A"
        )
        tm.assert_series_equal(result, expected)

    def test_construction_with_mixed(self, float_string_frame):
        # test construction edge cases with mixed types

        # f7u12, this does not work without extensive workaround
        data = [
            [datetime(2001, 1, 5), np.nan, datetime(2001, 1, 2)],
            [datetime(2000, 1, 2), datetime(2000, 1, 3), datetime(2000, 1, 1)],
        ]
        df = DataFrame(data)

        # check dtypes
        result = df.dtypes
        expected = Series({"datetime64[ns]": 3})

        # mixed-type frames
        float_string_frame["datetime"] = datetime.now()
        float_string_frame["timedelta"] = timedelta(days=1, seconds=1)
        assert float_string_frame["datetime"].dtype == "M8[ns]"
        assert float_string_frame["timedelta"].dtype == "m8[ns]"
        result = float_string_frame.dtypes
        expected = Series(
            [np.dtype("float64")] * 4
            + [
                np.dtype("object"),
                np.dtype("datetime64[ns]"),
                np.dtype("timedelta64[ns]"),
            ],
            index=list("ABCD") + ["foo", "datetime", "timedelta"],
        )
        tm.assert_series_equal(result, expected)

    def test_construction_with_conversions(self):

        # convert from a numpy array of non-ns timedelta64
        arr = np.array([1, 2, 3], dtype="timedelta64[s]")
        df = DataFrame(index=range(3))
        df["A"] = arr
        expected = DataFrame(
            {"A": pd.timedelta_range("00:00:01", periods=3, freq="s")}, index=range(3)
        )
        tm.assert_frame_equal(df, expected)

        expected = DataFrame(
            {
                "dt1": Timestamp("20130101"),
                "dt2": date_range("20130101", periods=3),
                # 'dt3' : date_range('20130101 00:00:01',periods=3,freq='s'),
            },
            index=range(3),
        )

        df = DataFrame(index=range(3))
        df["dt1"] = np.datetime64("2013-01-01")
        df["dt2"] = np.array(
            ["2013-01-01", "2013-01-02", "2013-01-03"], dtype="datetime64[D]"
        )

        # df['dt3'] = np.array(['2013-01-01 00:00:01','2013-01-01
        # 00:00:02','2013-01-01 00:00:03'],dtype='datetime64[s]')

        tm.assert_frame_equal(df, expected)

    def test_constructor_compound_dtypes(self):
        # GH 5191
        # compound dtypes should raise not-implementederror

        def f(dtype):
            data = list(itertools.repeat((datetime(2001, 1, 1), "aa", 20), 9))
            return DataFrame(data=data, columns=["A", "B", "C"], dtype=dtype)

        msg = "compound dtypes are not implemented in the DataFrame constructor"
        with pytest.raises(NotImplementedError, match=msg):
            f([("A", "datetime64[h]"), ("B", "str"), ("C", "int32")])

        # these work (though results may be unexpected)
        f("int64")
        f("float64")

        # 10822
        # invalid error message on dt inference
        if not compat.is_platform_windows():
            f("M8[ns]")

    def test_equals_different_blocks(self):
        # GH 9330
        df0 = pd.DataFrame({"A": ["x", "y"], "B": [1, 2], "C": ["w", "z"]})
        df1 = df0.reset_index()[["A", "B", "C"]]
        # this assert verifies that the above operations have
        # induced a block rearrangement
        assert df0._mgr.blocks[0].dtype != df1._mgr.blocks[0].dtype

        # do the real tests
        tm.assert_frame_equal(df0, df1)
        assert df0.equals(df1)
        assert df1.equals(df0)

    def test_copy_blocks(self, float_frame):
        # API/ENH 9607
        df = DataFrame(float_frame, copy=True)
        column = df.columns[0]

        # use the default copy=True, change a column
        blocks = df._to_dict_of_blocks(copy=True)
        for dtype, _df in blocks.items():
            if column in _df:
                _df.loc[:, column] = _df[column] + 1

        # make sure we did not change the original DataFrame
        assert not _df[column].equals(df[column])

    def test_no_copy_blocks(self, float_frame):
        # API/ENH 9607
        df = DataFrame(float_frame, copy=True)
        column = df.columns[0]

        # use the copy=False, change a column
        blocks = df._to_dict_of_blocks(copy=False)
        for dtype, _df in blocks.items():
            if column in _df:
                _df.loc[:, column] = _df[column] + 1

        # make sure we did change the original DataFrame
        assert _df[column].equals(df[column])

    def test_copy(self, float_frame, float_string_frame):
        cop = float_frame.copy()
        cop["E"] = cop["A"]
        assert "E" not in float_frame

        # copy objects
        copy = float_string_frame.copy()
        assert copy._mgr is not float_string_frame._mgr

    def test_pickle(self, float_string_frame, timezone_frame):
        empty_frame = DataFrame()

        unpickled = tm.round_trip_pickle(float_string_frame)
        tm.assert_frame_equal(float_string_frame, unpickled)

        # buglet
        float_string_frame._mgr.ndim

        # empty
        unpickled = tm.round_trip_pickle(empty_frame)
        repr(unpickled)

        # tz frame
        unpickled = tm.round_trip_pickle(timezone_frame)
        tm.assert_frame_equal(timezone_frame, unpickled)

    def test_consolidate_datetime64(self):
        # numpy vstack bug

        data = (
            "starting,ending,measure\n"
            "2012-06-21 00:00,2012-06-23 07:00,77\n"
            "2012-06-23 07:00,2012-06-23 16:30,65\n"
            "2012-06-23 16:30,2012-06-25 08:00,77\n"
            "2012-06-25 08:00,2012-06-26 12:00,0\n"
            "2012-06-26 12:00,2012-06-27 08:00,77\n"
        )
        df = pd.read_csv(StringIO(data), parse_dates=[0, 1])

        ser_starting = df.starting
        ser_starting.index = ser_starting.values
        ser_starting = ser_starting.tz_localize("US/Eastern")
        ser_starting = ser_starting.tz_convert("UTC")
        ser_starting.index.name = "starting"

        ser_ending = df.ending
        ser_ending.index = ser_ending.values
        ser_ending = ser_ending.tz_localize("US/Eastern")
        ser_ending = ser_ending.tz_convert("UTC")
        ser_ending.index.name = "ending"

        df.starting = ser_starting.index
        df.ending = ser_ending.index

        tm.assert_index_equal(pd.DatetimeIndex(df.starting), ser_starting.index)
        tm.assert_index_equal(pd.DatetimeIndex(df.ending), ser_ending.index)

    def test_is_mixed_type(self, float_frame, float_string_frame):
        assert not float_frame._is_mixed_type
        assert float_string_frame._is_mixed_type

    def test_get_numeric_data(self):

        datetime64name = np.dtype("M8[ns]").name
        objectname = np.dtype(np.object_).name

        df = DataFrame(
            {"a": 1.0, "b": 2, "c": "foo", "f": Timestamp("20010102")},
            index=np.arange(10),
        )
        result = df.dtypes
        expected = Series(
            [
                np.dtype("float64"),
                np.dtype("int64"),
                np.dtype(objectname),
                np.dtype(datetime64name),
            ],
            index=["a", "b", "c", "f"],
        )
        tm.assert_series_equal(result, expected)

        df = DataFrame(
            {
                "a": 1.0,
                "b": 2,
                "c": "foo",
                "d": np.array([1.0] * 10, dtype="float32"),
                "e": np.array([1] * 10, dtype="int32"),
                "f": np.array([1] * 10, dtype="int16"),
                "g": Timestamp("20010102"),
            },
            index=np.arange(10),
        )

        result = df._get_numeric_data()
        expected = df.loc[:, ["a", "b", "d", "e", "f"]]
        tm.assert_frame_equal(result, expected)

        only_obj = df.loc[:, ["c", "g"]]
        result = only_obj._get_numeric_data()
        expected = df.loc[:, []]
        tm.assert_frame_equal(result, expected)

        df = DataFrame.from_dict({"a": [1, 2], "b": ["foo", "bar"], "c": [np.pi, np.e]})
        result = df._get_numeric_data()
        expected = DataFrame.from_dict({"a": [1, 2], "c": [np.pi, np.e]})
        tm.assert_frame_equal(result, expected)

        df = result.copy()
        result = df._get_numeric_data()
        expected = df
        tm.assert_frame_equal(result, expected)

    def test_get_numeric_data_extension_dtype(self):
        # GH 22290
        df = DataFrame(
            {
                "A": integer_array([-10, np.nan, 0, 10, 20, 30], dtype="Int64"),
                "B": Categorical(list("abcabc")),
                "C": integer_array([0, 1, 2, 3, np.nan, 5], dtype="UInt8"),
                "D": IntervalArray.from_breaks(range(7)),
            }
        )
        result = df._get_numeric_data()
        expected = df.loc[:, ["A", "C"]]
        tm.assert_frame_equal(result, expected)

    def test_convert_objects(self, float_string_frame):

        oops = float_string_frame.T.T
        converted = oops._convert(datetime=True)
        tm.assert_frame_equal(converted, float_string_frame)
        assert converted["A"].dtype == np.float64

        # force numeric conversion
        float_string_frame["H"] = "1."
        float_string_frame["I"] = "1"

        # add in some items that will be nan
        length = len(float_string_frame)
        float_string_frame["J"] = "1."
        float_string_frame["K"] = "1"
        float_string_frame.loc[float_string_frame.index[0:5], ["J", "K"]] = "garbled"
        converted = float_string_frame._convert(datetime=True, numeric=True)
        assert converted["H"].dtype == "float64"
        assert converted["I"].dtype == "int64"
        assert converted["J"].dtype == "float64"
        assert converted["K"].dtype == "float64"
        assert len(converted["J"].dropna()) == length - 5
        assert len(converted["K"].dropna()) == length - 5

        # via astype
        converted = float_string_frame.copy()
        converted["H"] = converted["H"].astype("float64")
        converted["I"] = converted["I"].astype("int64")
        assert converted["H"].dtype == "float64"
        assert converted["I"].dtype == "int64"

        # via astype, but errors
        converted = float_string_frame.copy()
        with pytest.raises(ValueError, match="invalid literal"):
            converted["H"].astype("int32")

        # mixed in a single column
        df = DataFrame(dict(s=Series([1, "na", 3, 4])))
        result = df._convert(datetime=True, numeric=True)
        expected = DataFrame(dict(s=Series([1, np.nan, 3, 4])))
        tm.assert_frame_equal(result, expected)

    def test_convert_objects_no_conversion(self):
        mixed1 = DataFrame({"a": [1, 2, 3], "b": [4.0, 5, 6], "c": ["x", "y", "z"]})
        mixed2 = mixed1._convert(datetime=True)
        tm.assert_frame_equal(mixed1, mixed2)

    def test_infer_objects(self):
        # GH 11221
        df = DataFrame(
            {
                "a": ["a", 1, 2, 3],
                "b": ["b", 2.0, 3.0, 4.1],
                "c": [
                    "c",
                    datetime(2016, 1, 1),
                    datetime(2016, 1, 2),
                    datetime(2016, 1, 3),
                ],
                "d": [1, 2, 3, "d"],
            },
            columns=["a", "b", "c", "d"],
        )
        df = df.iloc[1:].infer_objects()

        assert df["a"].dtype == "int64"
        assert df["b"].dtype == "float64"
        assert df["c"].dtype == "M8[ns]"
        assert df["d"].dtype == "object"

        expected = DataFrame(
            {
                "a": [1, 2, 3],
                "b": [2.0, 3.0, 4.1],
                "c": [datetime(2016, 1, 1), datetime(2016, 1, 2), datetime(2016, 1, 3)],
                "d": [2, 3, "d"],
            },
            columns=["a", "b", "c", "d"],
        )
        # reconstruct frame to verify inference is same
        tm.assert_frame_equal(df.reset_index(drop=True), expected)

    def test_stale_cached_series_bug_473(self):

        # this is chained, but ok
        with option_context("chained_assignment", None):
            Y = DataFrame(
                np.random.random((4, 4)),
                index=("a", "b", "c", "d"),
                columns=("e", "f", "g", "h"),
            )
            repr(Y)
            Y["e"] = Y["e"].astype("object")
            Y["g"]["c"] = np.NaN
            repr(Y)
            result = Y.sum()  # noqa
            exp = Y["g"].sum()  # noqa
            assert pd.isna(Y["g"]["c"])

    def test_get_X_columns(self):
        # numeric and object columns

        df = DataFrame(
            {
                "a": [1, 2, 3],
                "b": [True, False, True],
                "c": ["foo", "bar", "baz"],
                "d": [None, None, None],
                "e": [3.14, 0.577, 2.773],
            }
        )

        tm.assert_index_equal(df._get_numeric_data().columns, pd.Index(["a", "b", "e"]))

    def test_strange_column_corruption_issue(self):
        # FIXME: dont leave commented-out
        # (wesm) Unclear how exactly this is related to internal matters
        df = DataFrame(index=[0, 1])
        df[0] = np.nan
        wasCol = {}

        for i, dt in enumerate(df.index):
            for col in range(100, 200):
                if col not in wasCol:
                    wasCol[col] = 1
                    df[col] = np.nan
                df[col][dt] = i

        myid = 100

        first = len(df.loc[pd.isna(df[myid]), [myid]])
        second = len(df.loc[pd.isna(df[myid]), [myid]])
        assert first == second == 0

    def test_constructor_no_pandas_array(self):
        # Ensure that PandasArray isn't allowed inside Series
        # See https://github.com/pandas-dev/pandas/issues/23995 for more.
        arr = pd.Series([1, 2, 3]).array
        result = pd.DataFrame({"A": arr})
        expected = pd.DataFrame({"A": [1, 2, 3]})
        tm.assert_frame_equal(result, expected)
        assert isinstance(result._mgr.blocks[0], IntBlock)

    def test_add_column_with_pandas_array(self):
        # GH 26390
        df = pd.DataFrame({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
        df["c"] = pd.arrays.PandasArray(np.array([1, 2, None, 3], dtype=object))
        df2 = pd.DataFrame(
            {
                "a": [1, 2, 3, 4],
                "b": ["a", "b", "c", "d"],
                "c": pd.arrays.PandasArray(np.array([1, 2, None, 3], dtype=object)),
            }
        )
        assert type(df["c"]._mgr.blocks[0]) == ObjectBlock
        assert type(df2["c"]._mgr.blocks[0]) == ObjectBlock
        tm.assert_frame_equal(df, df2)