Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / reshape / merge / test_multi.py

from collections import OrderedDict

import numpy as np
from numpy import nan
from numpy.random import randn
import pytest

import pandas as pd
from pandas import DataFrame, Index, MultiIndex, Series
from pandas.core.reshape.concat import concat
from pandas.core.reshape.merge import merge
import pandas.util.testing as tm


@pytest.fixture
def left():
    """left dataframe (not multi-indexed) for multi-index join tests"""
    # a little relevant example with NAs
    key1 = ["bar", "bar", "bar", "foo", "foo", "baz", "baz", "qux", "qux", "snap"]
    key2 = ["two", "one", "three", "one", "two", "one", "two", "two", "three", "one"]

    data = np.random.randn(len(key1))
    return DataFrame({"key1": key1, "key2": key2, "data": data})


@pytest.fixture
def right():
    """right dataframe (multi-indexed) for multi-index join tests"""
    index = MultiIndex(
        levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
        codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
        names=["key1", "key2"],
    )

    return DataFrame(
        np.random.randn(10, 3), index=index, columns=["j_one", "j_two", "j_three"]
    )


@pytest.fixture
def left_multi():
    return DataFrame(
        dict(
            Origin=["A", "A", "B", "B", "C"],
            Destination=["A", "B", "A", "C", "A"],
            Period=["AM", "AM", "IP", "AM", "OP"],
            TripPurp=["hbw", "nhb", "hbo", "nhb", "hbw"],
            Trips=[1987, 3647, 2470, 4296, 4444],
        ),
        columns=["Origin", "Destination", "Period", "TripPurp", "Trips"],
    ).set_index(["Origin", "Destination", "Period", "TripPurp"])


@pytest.fixture
def right_multi():
    return DataFrame(
        dict(
            Origin=["A", "A", "B", "B", "C", "C", "E"],
            Destination=["A", "B", "A", "B", "A", "B", "F"],
            Period=["AM", "AM", "IP", "AM", "OP", "IP", "AM"],
            LinkType=["a", "b", "c", "b", "a", "b", "a"],
            Distance=[100, 80, 90, 80, 75, 35, 55],
        ),
        columns=["Origin", "Destination", "Period", "LinkType", "Distance"],
    ).set_index(["Origin", "Destination", "Period", "LinkType"])


@pytest.fixture
def on_cols_multi():
    return ["Origin", "Destination", "Period"]


@pytest.fixture
def idx_cols_multi():
    return ["Origin", "Destination", "Period", "TripPurp", "LinkType"]


class TestMergeMulti:
    def setup_method(self):
        self.index = MultiIndex(
            levels=[["foo", "bar", "baz", "qux"], ["one", "two", "three"]],
            codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
            names=["first", "second"],
        )
        self.to_join = DataFrame(
            np.random.randn(10, 3),
            index=self.index,
            columns=["j_one", "j_two", "j_three"],
        )

        # a little relevant example with NAs
        key1 = ["bar", "bar", "bar", "foo", "foo", "baz", "baz", "qux", "qux", "snap"]
        key2 = [
            "two",
            "one",
            "three",
            "one",
            "two",
            "one",
            "two",
            "two",
            "three",
            "one",
        ]

        data = np.random.randn(len(key1))
        self.data = DataFrame({"key1": key1, "key2": key2, "data": data})

    def test_merge_on_multikey(self, left, right, join_type):
        on_cols = ["key1", "key2"]
        result = left.join(right, on=on_cols, how=join_type).reset_index(drop=True)

        expected = pd.merge(left, right.reset_index(), on=on_cols, how=join_type)

        tm.assert_frame_equal(result, expected)

        result = left.join(right, on=on_cols, how=join_type, sort=True).reset_index(
            drop=True
        )

        expected = pd.merge(
            left, right.reset_index(), on=on_cols, how=join_type, sort=True
        )

        tm.assert_frame_equal(result, expected)

    @pytest.mark.parametrize("sort", [False, True])
    def test_left_join_multi_index(self, left, right, sort):
        icols = ["1st", "2nd", "3rd"]

        def bind_cols(df):
            iord = lambda a: 0 if a != a else ord(a)
            f = lambda ts: ts.map(iord) - ord("a")
            return f(df["1st"]) + f(df["3rd"]) * 1e2 + df["2nd"].fillna(0) * 1e4

        def run_asserts(left, right, sort):
            res = left.join(right, on=icols, how="left", sort=sort)

            assert len(left) < len(res) + 1
            assert not res["4th"].isna().any()
            assert not res["5th"].isna().any()

            tm.assert_series_equal(res["4th"], -res["5th"], check_names=False)
            result = bind_cols(res.iloc[:, :-2])
            tm.assert_series_equal(res["4th"], result, check_names=False)
            assert result.name is None

            if sort:
                tm.assert_frame_equal(res, res.sort_values(icols, kind="mergesort"))

            out = merge(left, right.reset_index(), on=icols, sort=sort, how="left")

            res.index = np.arange(len(res))
            tm.assert_frame_equal(out, res)

        lc = list(map(chr, np.arange(ord("a"), ord("z") + 1)))
        left = DataFrame(np.random.choice(lc, (5000, 2)), columns=["1st", "3rd"])
        left.insert(1, "2nd", np.random.randint(0, 1000, len(left)))

        i = np.random.permutation(len(left))
        right = left.iloc[i].copy()

        left["4th"] = bind_cols(left)
        right["5th"] = -bind_cols(right)
        right.set_index(icols, inplace=True)

        run_asserts(left, right, sort)

        # inject some nulls
        left.loc[1::23, "1st"] = np.nan
        left.loc[2::37, "2nd"] = np.nan
        left.loc[3::43, "3rd"] = np.nan
        left["4th"] = bind_cols(left)

        i = np.random.permutation(len(left))
        right = left.iloc[i, :-1]
        right["5th"] = -bind_cols(right)
        right.set_index(icols, inplace=True)

        run_asserts(left, right, sort)

    @pytest.mark.parametrize("sort", [False, True])
    def test_merge_right_vs_left(self, left, right, sort):
        # compare left vs right merge with multikey
        on_cols = ["key1", "key2"]
        merged_left_right = left.merge(
            right, left_on=on_cols, right_index=True, how="left", sort=sort
        )

        merge_right_left = right.merge(
            left, right_on=on_cols, left_index=True, how="right", sort=sort
        )

        # Reorder columns
        merge_right_left = merge_right_left[merged_left_right.columns]

        tm.assert_frame_equal(merged_left_right, merge_right_left)

    def test_compress_group_combinations(self):

        # ~ 40000000 possible unique groups
        key1 = tm.rands_array(10, 10000)
        key1 = np.tile(key1, 2)
        key2 = key1[::-1]

        df = DataFrame({"key1": key1, "key2": key2, "value1": np.random.randn(20000)})

        df2 = DataFrame(
            {"key1": key1[::2], "key2": key2[::2], "value2": np.random.randn(10000)}
        )

        # just to hit the label compression code path
        merge(df, df2, how="outer")

    def test_left_join_index_preserve_order(self):

        on_cols = ["k1", "k2"]
        left = DataFrame(
            {
                "k1": [0, 1, 2] * 8,
                "k2": ["foo", "bar"] * 12,
                "v": np.array(np.arange(24), dtype=np.int64),
            }
        )

        index = MultiIndex.from_tuples([(2, "bar"), (1, "foo")])
        right = DataFrame({"v2": [5, 7]}, index=index)

        result = left.join(right, on=on_cols)

        expected = left.copy()
        expected["v2"] = np.nan
        expected.loc[(expected.k1 == 2) & (expected.k2 == "bar"), "v2"] = 5
        expected.loc[(expected.k1 == 1) & (expected.k2 == "foo"), "v2"] = 7

        tm.assert_frame_equal(result, expected)

        result.sort_values(on_cols, kind="mergesort", inplace=True)
        expected = left.join(right, on=on_cols, sort=True)

        tm.assert_frame_equal(result, expected)

        # test join with multi dtypes blocks
        left = DataFrame(
            {
                "k1": [0, 1, 2] * 8,
                "k2": ["foo", "bar"] * 12,
                "k3": np.array([0, 1, 2] * 8, dtype=np.float32),
                "v": np.array(np.arange(24), dtype=np.int32),
            }
        )

        index = MultiIndex.from_tuples([(2, "bar"), (1, "foo")])
        right = DataFrame({"v2": [5, 7]}, index=index)

        result = left.join(right, on=on_cols)

        expected = left.copy()
        expected["v2"] = np.nan
        expected.loc[(expected.k1 == 2) & (expected.k2 == "bar"), "v2"] = 5
        expected.loc[(expected.k1 == 1) & (expected.k2 == "foo"), "v2"] = 7

        tm.assert_frame_equal(result, expected)

        result = result.sort_values(on_cols, kind="mergesort")
        expected = left.join(right, on=on_cols, sort=True)

        tm.assert_frame_equal(result, expected)

    def test_left_join_index_multi_match_multiindex(self):
        left = DataFrame(
            [
                ["X", "Y", "C", "a"],
                ["W", "Y", "C", "e"],
                ["V", "Q", "A", "h"],
                ["V", "R", "D", "i"],
                ["X", "Y", "D", "b"],
                ["X", "Y", "A", "c"],
                ["W", "Q", "B", "f"],
                ["W", "R", "C", "g"],
                ["V", "Y", "C", "j"],
                ["X", "Y", "B", "d"],
            ],
            columns=["cola", "colb", "colc", "tag"],
            index=[3, 2, 0, 1, 7, 6, 4, 5, 9, 8],
        )

        right = DataFrame(
            [
                ["W", "R", "C", 0],
                ["W", "Q", "B", 3],
                ["W", "Q", "B", 8],
                ["X", "Y", "A", 1],
                ["X", "Y", "A", 4],
                ["X", "Y", "B", 5],
                ["X", "Y", "C", 6],
                ["X", "Y", "C", 9],
                ["X", "Q", "C", -6],
                ["X", "R", "C", -9],
                ["V", "Y", "C", 7],
                ["V", "R", "D", 2],
                ["V", "R", "D", -1],
                ["V", "Q", "A", -3],
            ],
            columns=["col1", "col2", "col3", "val"],
        ).set_index(["col1", "col2", "col3"])

        result = left.join(right, on=["cola", "colb", "colc"], how="left")

        expected = DataFrame(
            [
                ["X", "Y", "C", "a", 6],
                ["X", "Y", "C", "a", 9],
                ["W", "Y", "C", "e", nan],
                ["V", "Q", "A", "h", -3],
                ["V", "R", "D", "i", 2],
                ["V", "R", "D", "i", -1],
                ["X", "Y", "D", "b", nan],
                ["X", "Y", "A", "c", 1],
                ["X", "Y", "A", "c", 4],
                ["W", "Q", "B", "f", 3],
                ["W", "Q", "B", "f", 8],
                ["W", "R", "C", "g", 0],
                ["V", "Y", "C", "j", 7],
                ["X", "Y", "B", "d", 5],
            ],
            columns=["cola", "colb", "colc", "tag", "val"],
            index=[3, 3, 2, 0, 1, 1, 7, 6, 6, 4, 4, 5, 9, 8],
        )

        tm.assert_frame_equal(result, expected)

        result = left.join(right, on=["cola", "colb", "colc"], how="left", sort=True)

        expected = expected.sort_values(["cola", "colb", "colc"], kind="mergesort")

        tm.assert_frame_equal(result, expected)

    def test_left_join_index_multi_match(self):
        left = DataFrame(
            [["c", 0], ["b", 1], ["a", 2], ["b", 3]],
            columns=["tag", "val"],
            index=[2, 0, 1, 3],
        )

        right = DataFrame(
            [
                ["a", "v"],
                ["c", "w"],
                ["c", "x"],
                ["d", "y"],
                ["a", "z"],
                ["c", "r"],
                ["e", "q"],
                ["c", "s"],
            ],
            columns=["tag", "char"],
        ).set_index("tag")

        result = left.join(right, on="tag", how="left")

        expected = DataFrame(
            [
                ["c", 0, "w"],
                ["c", 0, "x"],
                ["c", 0, "r"],
                ["c", 0, "s"],
                ["b", 1, nan],
                ["a", 2, "v"],
                ["a", 2, "z"],
                ["b", 3, nan],
            ],
            columns=["tag", "val", "char"],
            index=[2, 2, 2, 2, 0, 1, 1, 3],
        )

        tm.assert_frame_equal(result, expected)

        result = left.join(right, on="tag", how="left", sort=True)
        expected2 = expected.sort_values("tag", kind="mergesort")

        tm.assert_frame_equal(result, expected2)

        # GH7331 - maintain left frame order in left merge
        result = merge(left, right.reset_index(), how="left", on="tag")
        expected.index = np.arange(len(expected))
        tm.assert_frame_equal(result, expected)

    def test_left_merge_na_buglet(self):
        left = DataFrame(
            {
                "id": list("abcde"),
                "v1": randn(5),
                "v2": randn(5),
                "dummy": list("abcde"),
                "v3": randn(5),
            },
            columns=["id", "v1", "v2", "dummy", "v3"],
        )
        right = DataFrame(
            {
                "id": ["a", "b", np.nan, np.nan, np.nan],
                "sv3": [1.234, 5.678, np.nan, np.nan, np.nan],
            }
        )

        result = merge(left, right, on="id", how="left")

        rdf = right.drop(["id"], axis=1)
        expected = left.join(rdf)
        tm.assert_frame_equal(result, expected)

    def test_merge_na_keys(self):
        data = [
            [1950, "A", 1.5],
            [1950, "B", 1.5],
            [1955, "B", 1.5],
            [1960, "B", np.nan],
            [1970, "B", 4.0],
            [1950, "C", 4.0],
            [1960, "C", np.nan],
            [1965, "C", 3.0],
            [1970, "C", 4.0],
        ]

        frame = DataFrame(data, columns=["year", "panel", "data"])

        other_data = [
            [1960, "A", np.nan],
            [1970, "A", np.nan],
            [1955, "A", np.nan],
            [1965, "A", np.nan],
            [1965, "B", np.nan],
            [1955, "C", np.nan],
        ]
        other = DataFrame(other_data, columns=["year", "panel", "data"])

        result = frame.merge(other, how="outer")

        expected = frame.fillna(-999).merge(other.fillna(-999), how="outer")
        expected = expected.replace(-999, np.nan)

        tm.assert_frame_equal(result, expected)

    @pytest.mark.parametrize("klass", [None, np.asarray, Series, Index])
    def test_merge_datetime_index(self, klass):
        # see gh-19038
        df = DataFrame(
            [1, 2, 3], ["2016-01-01", "2017-01-01", "2018-01-01"], columns=["a"]
        )
        df.index = pd.to_datetime(df.index)
        on_vector = df.index.year

        if klass is not None:
            on_vector = klass(on_vector)

        expected = DataFrame(
            OrderedDict([("a", [1, 2, 3]), ("key_1", [2016, 2017, 2018])])
        )

        result = df.merge(df, on=["a", on_vector], how="inner")
        tm.assert_frame_equal(result, expected)

        expected = DataFrame(
            OrderedDict(
                [("key_0", [2016, 2017, 2018]), ("a_x", [1, 2, 3]), ("a_y", [1, 2, 3])]
            )
        )

        result = df.merge(df, on=[df.index.year], how="inner")
        tm.assert_frame_equal(result, expected)

    def test_join_multi_levels(self):

        # GH 3662
        # merge multi-levels
        household = DataFrame(
            dict(
                household_id=[1, 2, 3],
                male=[0, 1, 0],
                wealth=[196087.3, 316478.7, 294750],
            ),
            columns=["household_id", "male", "wealth"],
        ).set_index("household_id")
        portfolio = DataFrame(
            dict(
                household_id=[1, 2, 2, 3, 3, 3, 4],
                asset_id=[
                    "nl0000301109",
                    "nl0000289783",
                    "gb00b03mlx29",
                    "gb00b03mlx29",
                    "lu0197800237",
                    "nl0000289965",
                    np.nan,
                ],
                name=[
                    "ABN Amro",
                    "Robeco",
                    "Royal Dutch Shell",
                    "Royal Dutch Shell",
                    "AAB Eastern Europe Equity Fund",
                    "Postbank BioTech Fonds",
                    np.nan,
                ],
                share=[1.0, 0.4, 0.6, 0.15, 0.6, 0.25, 1.0],
            ),
            columns=["household_id", "asset_id", "name", "share"],
        ).set_index(["household_id", "asset_id"])
        result = household.join(portfolio, how="inner")
        expected = (
            DataFrame(
                dict(
                    male=[0, 1, 1, 0, 0, 0],
                    wealth=[196087.3, 316478.7, 316478.7, 294750.0, 294750.0, 294750.0],
                    name=[
                        "ABN Amro",
                        "Robeco",
                        "Royal Dutch Shell",
                        "Royal Dutch Shell",
                        "AAB Eastern Europe Equity Fund",
                        "Postbank BioTech Fonds",
                    ],
                    share=[1.00, 0.40, 0.60, 0.15, 0.60, 0.25],
                    household_id=[1, 2, 2, 3, 3, 3],
                    asset_id=[
                        "nl0000301109",
                        "nl0000289783",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "lu0197800237",
                        "nl0000289965",
                    ],
                )
            )
            .set_index(["household_id", "asset_id"])
            .reindex(columns=["male", "wealth", "name", "share"])
        )
        tm.assert_frame_equal(result, expected)

        # equivalency
        result = merge(
            household.reset_index(),
            portfolio.reset_index(),
            on=["household_id"],
            how="inner",
        ).set_index(["household_id", "asset_id"])
        tm.assert_frame_equal(result, expected)

        result = household.join(portfolio, how="outer")
        expected = concat(
            [
                expected,
                (
                    DataFrame(
                        dict(share=[1.00]),
                        index=MultiIndex.from_tuples(
                            [(4, np.nan)], names=["household_id", "asset_id"]
                        ),
                    )
                ),
            ],
            axis=0,
            sort=True,
        ).reindex(columns=expected.columns)
        tm.assert_frame_equal(result, expected)

        # invalid cases
        household.index.name = "foo"

        with pytest.raises(ValueError):
            household.join(portfolio, how="inner")

        portfolio2 = portfolio.copy()
        portfolio2.index.set_names(["household_id", "foo"])

        with pytest.raises(ValueError):
            portfolio2.join(portfolio, how="inner")

    def test_join_multi_levels2(self):

        # some more advanced merges
        # GH6360
        household = DataFrame(
            dict(
                household_id=[1, 2, 2, 3, 3, 3, 4],
                asset_id=[
                    "nl0000301109",
                    "nl0000301109",
                    "gb00b03mlx29",
                    "gb00b03mlx29",
                    "lu0197800237",
                    "nl0000289965",
                    np.nan,
                ],
                share=[1.0, 0.4, 0.6, 0.15, 0.6, 0.25, 1.0],
            ),
            columns=["household_id", "asset_id", "share"],
        ).set_index(["household_id", "asset_id"])

        log_return = DataFrame(
            dict(
                asset_id=[
                    "gb00b03mlx29",
                    "gb00b03mlx29",
                    "gb00b03mlx29",
                    "lu0197800237",
                    "lu0197800237",
                ],
                t=[233, 234, 235, 180, 181],
                log_return=[0.09604978, -0.06524096, 0.03532373, 0.03025441, 0.036997],
            )
        ).set_index(["asset_id", "t"])

        expected = (
            DataFrame(
                dict(
                    household_id=[2, 2, 2, 3, 3, 3, 3, 3],
                    asset_id=[
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "lu0197800237",
                        "lu0197800237",
                    ],
                    t=[233, 234, 235, 233, 234, 235, 180, 181],
                    share=[0.6, 0.6, 0.6, 0.15, 0.15, 0.15, 0.6, 0.6],
                    log_return=[
                        0.09604978,
                        -0.06524096,
                        0.03532373,
                        0.09604978,
                        -0.06524096,
                        0.03532373,
                        0.03025441,
                        0.036997,
                    ],
                )
            )
            .set_index(["household_id", "asset_id", "t"])
            .reindex(columns=["share", "log_return"])
        )

        # this is the equivalency
        result = merge(
            household.reset_index(),
            log_return.reset_index(),
            on=["asset_id"],
            how="inner",
        ).set_index(["household_id", "asset_id", "t"])
        tm.assert_frame_equal(result, expected)

        expected = (
            DataFrame(
                dict(
                    household_id=[1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 4],
                    asset_id=[
                        "nl0000301109",
                        "nl0000301109",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "gb00b03mlx29",
                        "lu0197800237",
                        "lu0197800237",
                        "nl0000289965",
                        None,
                    ],
                    t=[None, None, 233, 234, 235, 233, 234, 235, 180, 181, None, None],
                    share=[
                        1.0,
                        0.4,
                        0.6,
                        0.6,
                        0.6,
                        0.15,
                        0.15,
                        0.15,
                        0.6,
                        0.6,
                        0.25,
                        1.0,
                    ],
                    log_return=[
                        None,
                        None,
                        0.09604978,
                        -0.06524096,
                        0.03532373,
                        0.09604978,
                        -0.06524096,
                        0.03532373,
                        0.03025441,
                        0.036997,
                        None,
                        None,
                    ],
                )
            )
            .set_index(["household_id", "asset_id", "t"])
            .reindex(columns=["share", "log_return"])
        )

        result = merge(
            household.reset_index(),
            log_return.reset_index(),
            on=["asset_id"],
            how="outer",
        ).set_index(["household_id", "asset_id", "t"])

        tm.assert_frame_equal(result, expected)


class TestJoinMultiMulti:
    def test_join_multi_multi(
        self, left_multi, right_multi, join_type, on_cols_multi, idx_cols_multi
    ):
        # Multi-index join tests
        expected = (
            pd.merge(
                left_multi.reset_index(),
                right_multi.reset_index(),
                how=join_type,
                on=on_cols_multi,
            )
            .set_index(idx_cols_multi)
            .sort_index()
        )

        result = left_multi.join(right_multi, how=join_type).sort_index()
        tm.assert_frame_equal(result, expected)

    def test_join_multi_empty_frames(
        self, left_multi, right_multi, join_type, on_cols_multi, idx_cols_multi
    ):

        left_multi = left_multi.drop(columns=left_multi.columns)
        right_multi = right_multi.drop(columns=right_multi.columns)

        expected = (
            pd.merge(
                left_multi.reset_index(),
                right_multi.reset_index(),
                how=join_type,
                on=on_cols_multi,
            )
            .set_index(idx_cols_multi)
            .sort_index()
        )

        result = left_multi.join(right_multi, how=join_type).sort_index()
        tm.assert_frame_equal(result, expected)

    @pytest.mark.parametrize("box", [None, np.asarray, Series, Index])
    def test_merge_datetime_index(self, box):
        # see gh-19038
        df = DataFrame(
            [1, 2, 3], ["2016-01-01", "2017-01-01", "2018-01-01"], columns=["a"]
        )
        df.index = pd.to_datetime(df.index)
        on_vector = df.index.year

        if box is not None:
            on_vector = box(on_vector)

        expected = DataFrame(
            OrderedDict([("a", [1, 2, 3]), ("key_1", [2016, 2017, 2018])])
        )

        result = df.merge(df, on=["a", on_vector], how="inner")
        tm.assert_frame_equal(result, expected)

        expected = DataFrame(
            OrderedDict(
                [("key_0", [2016, 2017, 2018]), ("a_x", [1, 2, 3]), ("a_y", [1, 2, 3])]
            )
        )

        result = df.merge(df, on=[df.index.year], how="inner")
        tm.assert_frame_equal(result, expected)

    def test_single_common_level(self):
        index_left = pd.MultiIndex.from_tuples(
            [("K0", "X0"), ("K0", "X1"), ("K1", "X2")], names=["key", "X"]
        )

        left = pd.DataFrame(
            {"A": ["A0", "A1", "A2"], "B": ["B0", "B1", "B2"]}, index=index_left
        )

        index_right = pd.MultiIndex.from_tuples(
            [("K0", "Y0"), ("K1", "Y1"), ("K2", "Y2"), ("K2", "Y3")], names=["key", "Y"]
        )

        right = pd.DataFrame(
            {"C": ["C0", "C1", "C2", "C3"], "D": ["D0", "D1", "D2", "D3"]},
            index=index_right,
        )

        result = left.join(right)
        expected = pd.merge(
            left.reset_index(), right.reset_index(), on=["key"], how="inner"
        ).set_index(["key", "X", "Y"])

        tm.assert_frame_equal(result, expected)