# pylint: disable=E1103
from warnings import catch_warnings
import numpy as np
from numpy.random import randn
import pytest
from pandas._libs import join as libjoin
import pandas.compat as compat
from pandas.compat import lrange
import pandas as pd
from pandas import DataFrame, Index, MultiIndex, Series, concat, merge
from pandas.tests.reshape.merge.test_merge import NGROUPS, N, get_test_data
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal
a_ = np.array
@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class TestJoin(object):
def setup_method(self, method):
# aggregate multiple columns
self.df = DataFrame({'key1': get_test_data(),
'key2': get_test_data(),
'data1': np.random.randn(N),
'data2': np.random.randn(N)})
# exclude a couple keys for fun
self.df = self.df[self.df['key2'] > 1]
self.df2 = DataFrame({'key1': get_test_data(n=N // 5),
'key2': get_test_data(ngroups=NGROUPS // 2,
n=N // 5),
'value': np.random.randn(N // 5)})
index, data = tm.getMixedTypeDict()
self.target = DataFrame(data, index=index)
# Join on string value
self.source = DataFrame({'MergedA': data['A'], 'MergedD': data['D']},
index=data['C'])
def test_cython_left_outer_join(self):
left = a_([0, 1, 2, 1, 2, 0, 0, 1, 2, 3, 3], dtype=np.int64)
right = a_([1, 1, 0, 4, 2, 2, 1], dtype=np.int64)
max_group = 5
ls, rs = libjoin.left_outer_join(left, right, max_group)
exp_ls = left.argsort(kind='mergesort')
exp_rs = right.argsort(kind='mergesort')
exp_li = a_([0, 1, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5,
6, 6, 7, 7, 8, 8, 9, 10])
exp_ri = a_([0, 0, 0, 1, 2, 3, 1, 2, 3, 1, 2, 3,
4, 5, 4, 5, 4, 5, -1, -1])
exp_ls = exp_ls.take(exp_li)
exp_ls[exp_li == -1] = -1
exp_rs = exp_rs.take(exp_ri)
exp_rs[exp_ri == -1] = -1
tm.assert_numpy_array_equal(ls, exp_ls, check_dtype=False)
tm.assert_numpy_array_equal(rs, exp_rs, check_dtype=False)
def test_cython_right_outer_join(self):
left = a_([0, 1, 2, 1, 2, 0, 0, 1, 2, 3, 3], dtype=np.int64)
right = a_([1, 1, 0, 4, 2, 2, 1], dtype=np.int64)
max_group = 5
rs, ls = libjoin.left_outer_join(right, left, max_group)
exp_ls = left.argsort(kind='mergesort')
exp_rs = right.argsort(kind='mergesort')
# 0 1 1 1
exp_li = a_([0, 1, 2, 3, 4, 5, 3, 4, 5, 3, 4, 5,
# 2 2 4
6, 7, 8, 6, 7, 8, -1])
exp_ri = a_([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3,
4, 4, 4, 5, 5, 5, 6])
exp_ls = exp_ls.take(exp_li)
exp_ls[exp_li == -1] = -1
exp_rs = exp_rs.take(exp_ri)
exp_rs[exp_ri == -1] = -1
tm.assert_numpy_array_equal(ls, exp_ls, check_dtype=False)
tm.assert_numpy_array_equal(rs, exp_rs, check_dtype=False)
def test_cython_inner_join(self):
left = a_([0, 1, 2, 1, 2, 0, 0, 1, 2, 3, 3], dtype=np.int64)
right = a_([1, 1, 0, 4, 2, 2, 1, 4], dtype=np.int64)
max_group = 5
ls, rs = libjoin.inner_join(left, right, max_group)
exp_ls = left.argsort(kind='mergesort')
exp_rs = right.argsort(kind='mergesort')
exp_li = a_([0, 1, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5,
6, 6, 7, 7, 8, 8])
exp_ri = a_([0, 0, 0, 1, 2, 3, 1, 2, 3, 1, 2, 3,
4, 5, 4, 5, 4, 5])
exp_ls = exp_ls.take(exp_li)
exp_ls[exp_li == -1] = -1
exp_rs = exp_rs.take(exp_ri)
exp_rs[exp_ri == -1] = -1
tm.assert_numpy_array_equal(ls, exp_ls, check_dtype=False)
tm.assert_numpy_array_equal(rs, exp_rs, check_dtype=False)
def test_left_outer_join(self):
joined_key2 = merge(self.df, self.df2, on='key2')
_check_join(self.df, self.df2, joined_key2, ['key2'], how='left')
joined_both = merge(self.df, self.df2)
_check_join(self.df, self.df2, joined_both, ['key1', 'key2'],
how='left')
def test_right_outer_join(self):
joined_key2 = merge(self.df, self.df2, on='key2', how='right')
_check_join(self.df, self.df2, joined_key2, ['key2'], how='right')
joined_both = merge(self.df, self.df2, how='right')
_check_join(self.df, self.df2, joined_both, ['key1', 'key2'],
how='right')
def test_full_outer_join(self):
joined_key2 = merge(self.df, self.df2, on='key2', how='outer')
_check_join(self.df, self.df2, joined_key2, ['key2'], how='outer')
joined_both = merge(self.df, self.df2, how='outer')
_check_join(self.df, self.df2, joined_both, ['key1', 'key2'],
how='outer')
def test_inner_join(self):
joined_key2 = merge(self.df, self.df2, on='key2', how='inner')
_check_join(self.df, self.df2, joined_key2, ['key2'], how='inner')
joined_both = merge(self.df, self.df2, how='inner')
_check_join(self.df, self.df2, joined_both, ['key1', 'key2'],
how='inner')
def test_handle_overlap(self):
joined = merge(self.df, self.df2, on='key2',
suffixes=['.foo', '.bar'])
assert 'key1.foo' in joined
assert 'key1.bar' in joined
def test_handle_overlap_arbitrary_key(self):
joined = merge(self.df, self.df2,
left_on='key2', right_on='key1',
suffixes=['.foo', '.bar'])
assert 'key1.foo' in joined
assert 'key2.bar' in joined
def test_join_on(self):
target = self.target
source = self.source
merged = target.join(source, on='C')
tm.assert_series_equal(merged['MergedA'], target['A'],
check_names=False)
tm.assert_series_equal(merged['MergedD'], target['D'],
check_names=False)
# join with duplicates (fix regression from DataFrame/Matrix merge)
df = DataFrame({'key': ['a', 'a', 'b', 'b', 'c']})
df2 = DataFrame({'value': [0, 1, 2]}, index=['a', 'b', 'c'])
joined = df.join(df2, on='key')
expected = DataFrame({'key': ['a', 'a', 'b', 'b', 'c'],
'value': [0, 0, 1, 1, 2]})
assert_frame_equal(joined, expected)
# Test when some are missing
df_a = DataFrame([[1], [2], [3]], index=['a', 'b', 'c'],
columns=['one'])
df_b = DataFrame([['foo'], ['bar']], index=[1, 2],
columns=['two'])
df_c = DataFrame([[1], [2]], index=[1, 2],
columns=['three'])
joined = df_a.join(df_b, on='one')
joined = joined.join(df_c, on='one')
assert np.isnan(joined['two']['c'])
assert np.isnan(joined['three']['c'])
# merge column not p resent
with pytest.raises(KeyError, match="^'E'$"):
target.join(source, on='E')
# overlap
source_copy = source.copy()
source_copy['A'] = 0
msg = ("You are trying to merge on float64 and object columns. If"
" you wish to proceed you should use pd.concat")
with pytest.raises(ValueError, match=msg):
target.join(source_copy, on='A')
def test_join_on_fails_with_different_right_index(self):
df = DataFrame({'a': np.random.choice(['m', 'f'], size=3),
'b': np.random.randn(3)})
df2 = DataFrame({'a': np.random.choice(['m', 'f'], size=10),
'b': np.random.randn(10)},
index=tm.makeCustomIndex(10, 2))
msg = (r'len\(left_on\) must equal the number of levels in the index'
' of "right"')
with pytest.raises(ValueError, match=msg):
merge(df, df2, left_on='a', right_index=True)
def test_join_on_fails_with_different_left_index(self):
df = DataFrame({'a': np.random.choice(['m', 'f'], size=3),
'b': np.random.randn(3)},
index=tm.makeCustomIndex(3, 2))
df2 = DataFrame({'a': np.random.choice(['m', 'f'], size=10),
'b': np.random.randn(10)})
msg = (r'len\(right_on\) must equal the number of levels in the index'
' of "left"')
with pytest.raises(ValueError, match=msg):
merge(df, df2, right_on='b', left_index=True)
def test_join_on_fails_with_different_column_counts(self):
df = DataFrame({'a': np.random.choice(['m', 'f'], size=3),
'b': np.random.randn(3)})
df2 = DataFrame({'a': np.random.choice(['m', 'f'], size=10),
'b': np.random.randn(10)},
index=tm.makeCustomIndex(10, 2))
msg = r"len\(right_on\) must equal len\(left_on\)"
with pytest.raises(ValueError, match=msg):
merge(df, df2, right_on='a', left_on=['a', 'b'])
@pytest.mark.parametrize("wrong_type", [2, 'str', None, np.array([0, 1])])
def test_join_on_fails_with_wrong_object_type(self, wrong_type):
# GH12081 - original issue
# GH21220 - merging of Series and DataFrame is now allowed
# Edited test to remove the Series object from test parameters
df = DataFrame({'a': [1, 1]})
msg = ("Can only merge Series or DataFrame objects, a {} was passed"
.format(str(type(wrong_type))))
with pytest.raises(TypeError, match=msg):
merge(wrong_type, df, left_on='a', right_on='a')
with pytest.raises(TypeError, match=msg):
merge(df, wrong_type, left_on='a', right_on='a')
def test_join_on_pass_vector(self):
expected = self.target.join(self.source, on='C')
del expected['C']
join_col = self.target.pop('C')
result = self.target.join(self.source, on=join_col)
assert_frame_equal(result, expected)
def test_join_with_len0(self):
# nothing to merge
merged = self.target.join(self.source.reindex([]), on='C')
for col in self.source:
assert col in merged
assert merged[col].isna().all()
merged2 = self.target.join(self.source.reindex([]), on='C',
how='inner')
tm.assert_index_equal(merged2.columns, merged.columns)
assert len(merged2) == 0
def test_join_on_inner(self):
df = DataFrame({'key': ['a', 'a', 'd', 'b', 'b', 'c']})
df2 = DataFrame({'value': [0, 1]}, index=['a', 'b'])
joined = df.join(df2, on='key', how='inner')
expected = df.join(df2, on='key')
expected = expected[expected['value'].notna()]
tm.assert_series_equal(joined['key'], expected['key'],
check_dtype=False)
tm.assert_series_equal(joined['value'], expected['value'],
check_dtype=False)
tm.assert_index_equal(joined.index, expected.index)
def test_join_on_singlekey_list(self):
df = DataFrame({'key': ['a', 'a', 'b', 'b', 'c']})
df2 = DataFrame({'value': [0, 1, 2]}, index=['a', 'b', 'c'])
# corner cases
joined = df.join(df2, on=['key'])
expected = df.join(df2, on='key')
assert_frame_equal(joined, expected)
def test_join_on_series(self):
result = self.target.join(self.source['MergedA'], on='C')
expected = self.target.join(self.source[['MergedA']], on='C')
assert_frame_equal(result, expected)
def test_join_on_series_buglet(self):
# GH #638
df = DataFrame({'a': [1, 1]})
ds = Series([2], index=[1], name='b')
result = df.join(ds, on='a')
expected = DataFrame({'a': [1, 1],
'b': [2, 2]}, index=df.index)
tm.assert_frame_equal(result, expected)
def test_join_index_mixed(self, join_type):
# no overlapping blocks
df1 = DataFrame(index=np.arange(10))
df1['bool'] = True
df1['string'] = 'foo'
df2 = DataFrame(index=np.arange(5, 15))
df2['int'] = 1
df2['float'] = 1.
joined = df1.join(df2, how=join_type)
expected = _join_by_hand(df1, df2, how=join_type)
assert_frame_equal(joined, expected)
joined = df2.join(df1, how=join_type)
expected = _join_by_hand(df2, df1, how=join_type)
assert_frame_equal(joined, expected)
def test_join_index_mixed_overlap(self):
df1 = DataFrame({'A': 1., 'B': 2, 'C': 'foo', 'D': True},
index=np.arange(10),
columns=['A', 'B', 'C', 'D'])
assert df1['B'].dtype == np.int64
assert df1['D'].dtype == np.bool_
df2 = DataFrame({'A': 1., 'B': 2, 'C': 'foo', 'D': True},
index=np.arange(0, 10, 2),
columns=['A', 'B', 'C', 'D'])
# overlap
joined = df1.join(df2, lsuffix='_one', rsuffix='_two')
expected_columns = ['A_one', 'B_one', 'C_one', 'D_one',
Loading ...