# pylint: disable=E1103
from collections import OrderedDict
from datetime import date, datetime
import random
import re
import numpy as np
from numpy import nan
import pytest
from pandas.compat import lrange
from pandas.core.dtypes.common import is_categorical_dtype, is_object_dtype
from pandas.core.dtypes.dtypes import CategoricalDtype
import pandas as pd
from pandas import (
Categorical, CategoricalIndex, DataFrame, DatetimeIndex, Float64Index,
Int64Index, MultiIndex, RangeIndex, Series, UInt64Index)
from pandas.api.types import CategoricalDtype as CDT
from pandas.core.reshape.concat import concat
from pandas.core.reshape.merge import MergeError, merge
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal, assert_series_equal
N = 50
NGROUPS = 8
def get_test_data(ngroups=NGROUPS, n=N):
unique_groups = lrange(ngroups)
arr = np.asarray(np.tile(unique_groups, n // ngroups))
if len(arr) < n:
arr = np.asarray(list(arr) + unique_groups[:n - len(arr)])
random.shuffle(arr)
return arr
def get_series():
return [
pd.Series([1], dtype='int64'),
pd.Series([1], dtype='Int64'),
pd.Series([1.23]),
pd.Series(['foo']),
pd.Series([True]),
pd.Series([pd.Timestamp('2018-01-01')]),
pd.Series([pd.Timestamp('2018-01-01', tz='US/Eastern')]),
]
def get_series_na():
return [
pd.Series([np.nan], dtype='Int64'),
pd.Series([np.nan], dtype='float'),
pd.Series([np.nan], dtype='object'),
pd.Series([pd.NaT]),
]
@pytest.fixture(params=get_series(), ids=lambda x: x.dtype.name)
def series_of_dtype(request):
"""
A parametrized fixture returning a variety of Series of different
dtypes
"""
return request.param
@pytest.fixture(params=get_series(), ids=lambda x: x.dtype.name)
def series_of_dtype2(request):
"""
A duplicate of the series_of_dtype fixture, so that it can be used
twice by a single function
"""
return request.param
@pytest.fixture(params=get_series_na(), ids=lambda x: x.dtype.name)
def series_of_dtype_all_na(request):
"""
A parametrized fixture returning a variety of Series with all NA
values
"""
return request.param
class TestMerge(object):
def setup_method(self, method):
# aggregate multiple columns
self.df = DataFrame({'key1': get_test_data(),
'key2': get_test_data(),
'data1': np.random.randn(N),
'data2': np.random.randn(N)})
# exclude a couple keys for fun
self.df = self.df[self.df['key2'] > 1]
self.df2 = DataFrame({'key1': get_test_data(n=N // 5),
'key2': get_test_data(ngroups=NGROUPS // 2,
n=N // 5),
'value': np.random.randn(N // 5)})
self.left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
'v1': np.random.randn(7)})
self.right = DataFrame({'v2': np.random.randn(4)},
index=['d', 'b', 'c', 'a'])
def test_merge_inner_join_empty(self):
# GH 15328
df_empty = pd.DataFrame()
df_a = pd.DataFrame({'a': [1, 2]}, index=[0, 1], dtype='int64')
result = pd.merge(df_empty, df_a, left_index=True, right_index=True)
expected = pd.DataFrame({'a': []}, index=[], dtype='int64')
assert_frame_equal(result, expected)
def test_merge_common(self):
joined = merge(self.df, self.df2)
exp = merge(self.df, self.df2, on=['key1', 'key2'])
tm.assert_frame_equal(joined, exp)
def test_merge_index_as_on_arg(self):
# GH14355
left = self.df.set_index('key1')
right = self.df2.set_index('key1')
result = merge(left, right, on='key1')
expected = merge(self.df, self.df2, on='key1').set_index('key1')
assert_frame_equal(result, expected)
def test_merge_index_singlekey_right_vs_left(self):
left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
'v1': np.random.randn(7)})
right = DataFrame({'v2': np.random.randn(4)},
index=['d', 'b', 'c', 'a'])
merged1 = merge(left, right, left_on='key',
right_index=True, how='left', sort=False)
merged2 = merge(right, left, right_on='key',
left_index=True, how='right', sort=False)
assert_frame_equal(merged1, merged2.loc[:, merged1.columns])
merged1 = merge(left, right, left_on='key',
right_index=True, how='left', sort=True)
merged2 = merge(right, left, right_on='key',
left_index=True, how='right', sort=True)
assert_frame_equal(merged1, merged2.loc[:, merged1.columns])
def test_merge_index_singlekey_inner(self):
left = DataFrame({'key': ['a', 'b', 'c', 'd', 'e', 'e', 'a'],
'v1': np.random.randn(7)})
right = DataFrame({'v2': np.random.randn(4)},
index=['d', 'b', 'c', 'a'])
# inner join
result = merge(left, right, left_on='key', right_index=True,
how='inner')
expected = left.join(right, on='key').loc[result.index]
assert_frame_equal(result, expected)
result = merge(right, left, right_on='key', left_index=True,
how='inner')
expected = left.join(right, on='key').loc[result.index]
assert_frame_equal(result, expected.loc[:, result.columns])
def test_merge_misspecified(self):
msg = "Must pass right_on or right_index=True"
with pytest.raises(pd.errors.MergeError, match=msg):
merge(self.left, self.right, left_index=True)
msg = "Must pass left_on or left_index=True"
with pytest.raises(pd.errors.MergeError, match=msg):
merge(self.left, self.right, right_index=True)
msg = ('Can only pass argument "on" OR "left_on" and "right_on", not'
' a combination of both')
with pytest.raises(pd.errors.MergeError, match=msg):
merge(self.left, self.left, left_on='key', on='key')
msg = r"len\(right_on\) must equal len\(left_on\)"
with pytest.raises(ValueError, match=msg):
merge(self.df, self.df2, left_on=['key1'],
right_on=['key1', 'key2'])
def test_index_and_on_parameters_confusion(self):
msg = ("right_index parameter must be of type bool, not"
r" <(class|type) 'list'>")
with pytest.raises(ValueError, match=msg):
merge(self.df, self.df2, how='left',
left_index=False, right_index=['key1', 'key2'])
msg = ("left_index parameter must be of type bool, not "
r"<(class|type) 'list'>")
with pytest.raises(ValueError, match=msg):
merge(self.df, self.df2, how='left',
left_index=['key1', 'key2'], right_index=False)
with pytest.raises(ValueError, match=msg):
merge(self.df, self.df2, how='left',
left_index=['key1', 'key2'], right_index=['key1', 'key2'])
def test_merge_overlap(self):
merged = merge(self.left, self.left, on='key')
exp_len = (self.left['key'].value_counts() ** 2).sum()
assert len(merged) == exp_len
assert 'v1_x' in merged
assert 'v1_y' in merged
def test_merge_different_column_key_names(self):
left = DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
'value': [1, 2, 3, 4]})
right = DataFrame({'rkey': ['foo', 'bar', 'qux', 'foo'],
'value': [5, 6, 7, 8]})
merged = left.merge(right, left_on='lkey', right_on='rkey',
how='outer', sort=True)
exp = pd.Series(['bar', 'baz', 'foo', 'foo', 'foo', 'foo', np.nan],
name='lkey')
tm.assert_series_equal(merged['lkey'], exp)
exp = pd.Series(['bar', np.nan, 'foo', 'foo', 'foo', 'foo', 'qux'],
name='rkey')
tm.assert_series_equal(merged['rkey'], exp)
exp = pd.Series([2, 3, 1, 1, 4, 4, np.nan], name='value_x')
tm.assert_series_equal(merged['value_x'], exp)
exp = pd.Series([6, np.nan, 5, 8, 5, 8, 7], name='value_y')
tm.assert_series_equal(merged['value_y'], exp)
def test_merge_copy(self):
left = DataFrame({'a': 0, 'b': 1}, index=lrange(10))
right = DataFrame({'c': 'foo', 'd': 'bar'}, index=lrange(10))
merged = merge(left, right, left_index=True,
right_index=True, copy=True)
merged['a'] = 6
assert (left['a'] == 0).all()
merged['d'] = 'peekaboo'
assert (right['d'] == 'bar').all()
def test_merge_nocopy(self):
left = DataFrame({'a': 0, 'b': 1}, index=lrange(10))
right = DataFrame({'c': 'foo', 'd': 'bar'}, index=lrange(10))
merged = merge(left, right, left_index=True,
right_index=True, copy=False)
merged['a'] = 6
assert (left['a'] == 6).all()
merged['d'] = 'peekaboo'
assert (right['d'] == 'peekaboo').all()
def test_intelligently_handle_join_key(self):
# #733, be a bit more 1337 about not returning unconsolidated DataFrame
left = DataFrame({'key': [1, 1, 2, 2, 3],
'value': lrange(5)}, columns=['value', 'key'])
right = DataFrame({'key': [1, 1, 2, 3, 4, 5],
'rvalue': lrange(6)})
joined = merge(left, right, on='key', how='outer')
expected = DataFrame({'key': [1, 1, 1, 1, 2, 2, 3, 4, 5],
'value': np.array([0, 0, 1, 1, 2, 3, 4,
np.nan, np.nan]),
'rvalue': [0, 1, 0, 1, 2, 2, 3, 4, 5]},
columns=['value', 'key', 'rvalue'])
assert_frame_equal(joined, expected)
def test_merge_join_key_dtype_cast(self):
# #8596
df1 = DataFrame({'key': [1], 'v1': [10]})
df2 = DataFrame({'key': [2], 'v1': [20]})
df = merge(df1, df2, how='outer')
assert df['key'].dtype == 'int64'
df1 = DataFrame({'key': [True], 'v1': [1]})
df2 = DataFrame({'key': [False], 'v1': [0]})
df = merge(df1, df2, how='outer')
# GH13169
# this really should be bool
assert df['key'].dtype == 'object'
df1 = DataFrame({'val': [1]})
df2 = DataFrame({'val': [2]})
lkey = np.array([1])
rkey = np.array([2])
df = merge(df1, df2, left_on=lkey, right_on=rkey, how='outer')
assert df['key_0'].dtype == 'int64'
def test_handle_join_key_pass_array(self):
left = DataFrame({'key': [1, 1, 2, 2, 3],
'value': lrange(5)}, columns=['value', 'key'])
right = DataFrame({'rvalue': lrange(6)})
key = np.array([1, 1, 2, 3, 4, 5])
merged = merge(left, right, left_on='key', right_on=key, how='outer')
merged2 = merge(right, left, left_on=key, right_on='key', how='outer')
assert_series_equal(merged['key'], merged2['key'])
assert merged['key'].notna().all()
assert merged2['key'].notna().all()
left = DataFrame({'value': lrange(5)}, columns=['value'])
right = DataFrame({'rvalue': lrange(6)})
lkey = np.array([1, 1, 2, 2, 3])
rkey = np.array([1, 1, 2, 3, 4, 5])
merged = merge(left, right, left_on=lkey, right_on=rkey, how='outer')
tm.assert_series_equal(merged['key_0'], Series([1, 1, 1, 1, 2,
2, 3, 4, 5],
name='key_0'))
left = DataFrame({'value': lrange(3)})
right = DataFrame({'rvalue': lrange(6)})
key = np.array([0, 1, 1, 2, 2, 3], dtype=np.int64)
merged = merge(left, right, left_index=True, right_on=key, how='outer')
tm.assert_series_equal(merged['key_0'], Series(key, name='key_0'))
def test_no_overlap_more_informative_error(self):
dt = datetime.now()
df1 = DataFrame({'x': ['a']}, index=[dt])
df2 = DataFrame({'y': ['b', 'c']}, index=[dt, dt])
msg = ('No common columns to perform merge on. '
'Merge options: left_on={lon}, right_on={ron}, '
'left_index={lidx}, right_index={ridx}'
.format(lon=None, ron=None, lidx=False, ridx=False))
with pytest.raises(MergeError, match=msg):
merge(df1, df2)
def test_merge_non_unique_indexes(self):
dt = datetime(2012, 5, 1)
dt2 = datetime(2012, 5, 2)
dt3 = datetime(2012, 5, 3)
Loading ...