Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / reshape / merge / test_merge.py

# pylint: disable=E1103

from collections import OrderedDict
from datetime import date, datetime
import random
import re

import numpy as np
from numpy import nan
import pytest

from pandas.compat import lrange

from pandas.core.dtypes.common import is_categorical_dtype, is_object_dtype
from pandas.core.dtypes.dtypes import CategoricalDtype

import pandas as pd
from pandas import (
    Categorical, CategoricalIndex, DataFrame, DatetimeIndex, Float64Index,
    Int64Index, MultiIndex, RangeIndex, Series, UInt64Index)
from pandas.api.types import CategoricalDtype as CDT
from pandas.core.reshape.concat import concat
from pandas.core.reshape.merge import MergeError, merge
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal, assert_series_equal

N = 50
NGROUPS = 8


def get_test_data(ngroups=NGROUPS, n=N):
    unique_groups = lrange(ngroups)
    arr = np.asarray(np.tile(unique_groups, n // ngroups))

    if len(arr) < n:
        arr = np.asarray(list(arr) + unique_groups[:n - len(arr)])

    random.shuffle(arr)
    return arr


def get_series():
    return [
        pd.Series([1], dtype='int64'),
        pd.Series([1], dtype='Int64'),
        pd.Series([1.23]),
        pd.Series(['foo']),
        pd.Series([True]),
        pd.Series([pd.Timestamp('2018-01-01')]),
        pd.Series([pd.Timestamp('2018-01-01', tz='US/Eastern')]),
    ]


def get_series_na():
    return [
        pd.Series([np.nan], dtype='Int64'),
        pd.Series([np.nan], dtype='float'),
        pd.Series([np.nan], dtype='object'),
        pd.Series([pd.NaT]),
    ]


@pytest.fixture(params=get_series(), ids=lambda x: x.dtype.name)
def series_of_dtype(request):
    """
    A parametrized fixture returning a variety of Series of different
    dtypes
    """
    return request.param


@pytest.fixture(params=get_series(), ids=lambda x: x.dtype.name)
def series_of_dtype2(request):
    """
    A duplicate of the series_of_dtype fixture, so that it can be used
    twice by a single function
    """
    return request.param


@pytest.fixture(params=get_series_na(), ids=lambda x: x.dtype.name)
def series_of_dtype_all_na(request):
    """
    A parametrized fixture returning a variety of Series with all NA
    values
    """
    return request.param


class TestMerge(object):

    def setup_method(self, method):
        # aggregate multiple columns
        self.df = DataFrame({'key1': get_test_data(),
                             'key2': get_test_data(),
                             'data1': np.random.randn(N),
                             'data2': np.random.randn(N)})

        # exclude a couple keys for fun
        self.df = self.df[self.df['key2'] > 1]

        self.df2 = DataFrame({'key1': get_test_data(n=N // 5),
                              'key2': get_test_data(ngroups=NGROUPS // 2,
                                                    n=N // 5),
                              'value': np.random.randn(N // 5)})

        self.left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
                               'v1': np.random.randn(7)})
        self.right = DataFrame({'v2': np.random.randn(4)},
                               index=['d', 'b', 'c', 'a'])

    def test_merge_inner_join_empty(self):
        # GH 15328
        df_empty = pd.DataFrame()
        df_a = pd.DataFrame({'a': [1, 2]}, index=[0, 1], dtype='int64')
        result = pd.merge(df_empty, df_a, left_index=True, right_index=True)
        expected = pd.DataFrame({'a': []}, index=[], dtype='int64')
        assert_frame_equal(result, expected)

    def test_merge_common(self):
        joined = merge(self.df, self.df2)
        exp = merge(self.df, self.df2, on=['key1', 'key2'])
        tm.assert_frame_equal(joined, exp)

    def test_merge_index_as_on_arg(self):
        # GH14355

        left = self.df.set_index('key1')
        right = self.df2.set_index('key1')
        result = merge(left, right, on='key1')
        expected = merge(self.df, self.df2, on='key1').set_index('key1')
        assert_frame_equal(result, expected)

    def test_merge_index_singlekey_right_vs_left(self):
        left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
                          'v1': np.random.randn(7)})
        right = DataFrame({'v2': np.random.randn(4)},
                          index=['d', 'b', 'c', 'a'])

        merged1 = merge(left, right, left_on='key',
                        right_index=True, how='left', sort=False)
        merged2 = merge(right, left, right_on='key',
                        left_index=True, how='right', sort=False)
        assert_frame_equal(merged1, merged2.loc[:, merged1.columns])

        merged1 = merge(left, right, left_on='key',
                        right_index=True, how='left', sort=True)
        merged2 = merge(right, left, right_on='key',
                        left_index=True, how='right', sort=True)
        assert_frame_equal(merged1, merged2.loc[:, merged1.columns])

    def test_merge_index_singlekey_inner(self):
        left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
                          'v1': np.random.randn(7)})
        right = DataFrame({'v2': np.random.randn(4)},
                          index=['d', 'b', 'c', 'a'])

        # inner join
        result = merge(left, right, left_on='key', right_index=True,
                       how='inner')
        expected = left.join(right, on='key').loc[result.index]
        assert_frame_equal(result, expected)

        result = merge(right, left, right_on='key', left_index=True,
                       how='inner')
        expected = left.join(right, on='key').loc[result.index]
        assert_frame_equal(result, expected.loc[:, result.columns])

    def test_merge_misspecified(self):
        msg = "Must pass right_on or right_index=True"
        with pytest.raises(pd.errors.MergeError, match=msg):
            merge(self.left, self.right, left_index=True)
        msg = "Must pass left_on or left_index=True"
        with pytest.raises(pd.errors.MergeError, match=msg):
            merge(self.left, self.right, right_index=True)

        msg = ('Can only pass argument "on" OR "left_on" and "right_on", not'
               ' a combination of both')
        with pytest.raises(pd.errors.MergeError, match=msg):
            merge(self.left, self.left, left_on='key', on='key')

        msg = r"len\(right_on\) must equal len\(left_on\)"
        with pytest.raises(ValueError, match=msg):
            merge(self.df, self.df2, left_on=['key1'],
                  right_on=['key1', 'key2'])

    def test_index_and_on_parameters_confusion(self):
        msg = ("right_index parameter must be of type bool, not"
               r" <(class|type) 'list'>")
        with pytest.raises(ValueError, match=msg):
            merge(self.df, self.df2, how='left',
                  left_index=False, right_index=['key1', 'key2'])
        msg = ("left_index parameter must be of type bool, not "
               r"<(class|type) 'list'>")
        with pytest.raises(ValueError, match=msg):
            merge(self.df, self.df2, how='left',
                  left_index=['key1', 'key2'], right_index=False)
        with pytest.raises(ValueError, match=msg):
            merge(self.df, self.df2, how='left',
                  left_index=['key1', 'key2'], right_index=['key1', 'key2'])

    def test_merge_overlap(self):
        merged = merge(self.left, self.left, on='key')
        exp_len = (self.left['key'].value_counts() ** 2).sum()
        assert len(merged) == exp_len
        assert 'v1_x' in merged
        assert 'v1_y' in merged

    def test_merge_different_column_key_names(self):
        left = DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                          'value': [1, 2, 3, 4]})
        right = DataFrame({'rkey': ['foo', 'bar', 'qux', 'foo'],
                           'value': [5, 6, 7, 8]})

        merged = left.merge(right, left_on='lkey', right_on='rkey',
                            how='outer', sort=True)

        exp = pd.Series(['bar', 'baz', 'foo', 'foo', 'foo', 'foo', np.nan],
                        name='lkey')
        tm.assert_series_equal(merged['lkey'], exp)

        exp = pd.Series(['bar', np.nan, 'foo', 'foo', 'foo', 'foo', 'qux'],
                        name='rkey')
        tm.assert_series_equal(merged['rkey'], exp)

        exp = pd.Series([2, 3, 1, 1, 4, 4, np.nan], name='value_x')
        tm.assert_series_equal(merged['value_x'], exp)

        exp = pd.Series([6, np.nan, 5, 8, 5, 8, 7], name='value_y')
        tm.assert_series_equal(merged['value_y'], exp)

    def test_merge_copy(self):
        left = DataFrame({'a': 0, 'b': 1}, index=lrange(10))
        right = DataFrame({'c': 'foo', 'd': 'bar'}, index=lrange(10))

        merged = merge(left, right, left_index=True,
                       right_index=True, copy=True)

        merged['a'] = 6
        assert (left['a'] == 0).all()

        merged['d'] = 'peekaboo'
        assert (right['d'] == 'bar').all()

    def test_merge_nocopy(self):
        left = DataFrame({'a': 0, 'b': 1}, index=lrange(10))
        right = DataFrame({'c': 'foo', 'd': 'bar'}, index=lrange(10))

        merged = merge(left, right, left_index=True,
                       right_index=True, copy=False)

        merged['a'] = 6
        assert (left['a'] == 6).all()

        merged['d'] = 'peekaboo'
        assert (right['d'] == 'peekaboo').all()

    def test_intelligently_handle_join_key(self):
        # #733, be a bit more 1337 about not returning unconsolidated DataFrame

        left = DataFrame({'key': [1, 1, 2, 2, 3],
                          'value': lrange(5)}, columns=['value', 'key'])
        right = DataFrame({'key': [1, 1, 2, 3, 4, 5],
                           'rvalue': lrange(6)})

        joined = merge(left, right, on='key', how='outer')
        expected = DataFrame({'key': [1, 1, 1, 1, 2, 2, 3, 4, 5],
                              'value': np.array([0, 0, 1, 1, 2, 3, 4,
                                                 np.nan, np.nan]),
                              'rvalue': [0, 1, 0, 1, 2, 2, 3, 4, 5]},
                             columns=['value', 'key', 'rvalue'])
        assert_frame_equal(joined, expected)

    def test_merge_join_key_dtype_cast(self):
        # #8596

        df1 = DataFrame({'key': [1], 'v1': [10]})
        df2 = DataFrame({'key': [2], 'v1': [20]})
        df = merge(df1, df2, how='outer')
        assert df['key'].dtype == 'int64'

        df1 = DataFrame({'key': [True], 'v1': [1]})
        df2 = DataFrame({'key': [False], 'v1': [0]})
        df = merge(df1, df2, how='outer')

        # GH13169
        # this really should be bool
        assert df['key'].dtype == 'object'

        df1 = DataFrame({'val': [1]})
        df2 = DataFrame({'val': [2]})
        lkey = np.array([1])
        rkey = np.array([2])
        df = merge(df1, df2, left_on=lkey, right_on=rkey, how='outer')
        assert df['key_0'].dtype == 'int64'

    def test_handle_join_key_pass_array(self):
        left = DataFrame({'key': [1, 1, 2, 2, 3],
                          'value': lrange(5)}, columns=['value', 'key'])
        right = DataFrame({'rvalue': lrange(6)})
        key = np.array([1, 1, 2, 3, 4, 5])

        merged = merge(left, right, left_on='key', right_on=key, how='outer')
        merged2 = merge(right, left, left_on=key, right_on='key', how='outer')

        assert_series_equal(merged['key'], merged2['key'])
        assert merged['key'].notna().all()
        assert merged2['key'].notna().all()

        left = DataFrame({'value': lrange(5)}, columns=['value'])
        right = DataFrame({'rvalue': lrange(6)})
        lkey = np.array([1, 1, 2, 2, 3])
        rkey = np.array([1, 1, 2, 3, 4, 5])

        merged = merge(left, right, left_on=lkey, right_on=rkey, how='outer')
        tm.assert_series_equal(merged['key_0'], Series([1, 1, 1, 1, 2,
                                                        2, 3, 4, 5],
                                                       name='key_0'))

        left = DataFrame({'value': lrange(3)})
        right = DataFrame({'rvalue': lrange(6)})

        key = np.array([0, 1, 1, 2, 2, 3], dtype=np.int64)
        merged = merge(left, right, left_index=True, right_on=key, how='outer')
        tm.assert_series_equal(merged['key_0'], Series(key, name='key_0'))

    def test_no_overlap_more_informative_error(self):
        dt = datetime.now()
        df1 = DataFrame({'x': ['a']}, index=[dt])

        df2 = DataFrame({'y': ['b', 'c']}, index=[dt, dt])

        msg = ('No common columns to perform merge on. '
               'Merge options: left_on={lon}, right_on={ron}, '
               'left_index={lidx}, right_index={ridx}'
               .format(lon=None, ron=None, lidx=False, ridx=False))

        with pytest.raises(MergeError, match=msg):
            merge(df1, df2)

    def test_merge_non_unique_indexes(self):

        dt = datetime(2012, 5, 1)
        dt2 = datetime(2012, 5, 2)
        dt3 = datetime(2012, 5, 3)
Loading ...