# -*- coding: utf-8 -*-
from collections import deque
from datetime import datetime
import operator
import numpy as np
import pytest
from pandas.compat import range
import pandas as pd
from pandas.tests.frame.common import _check_mixed_float, _check_mixed_int
import pandas.util.testing as tm
# -------------------------------------------------------------------
# Comparisons
class TestFrameComparisons(object):
# Specifically _not_ flex-comparisons
def test_comparison_invalid(self):
def check(df, df2):
for (x, y) in [(df, df2), (df2, df)]:
# we expect the result to match Series comparisons for
# == and !=, inequalities should raise
result = x == y
expected = pd.DataFrame({col: x[col] == y[col]
for col in x.columns},
index=x.index, columns=x.columns)
tm.assert_frame_equal(result, expected)
result = x != y
expected = pd.DataFrame({col: x[col] != y[col]
for col in x.columns},
index=x.index, columns=x.columns)
tm.assert_frame_equal(result, expected)
with pytest.raises(TypeError):
x >= y
with pytest.raises(TypeError):
x > y
with pytest.raises(TypeError):
x < y
with pytest.raises(TypeError):
x <= y
# GH4968
# invalid date/int comparisons
df = pd.DataFrame(np.random.randint(10, size=(10, 1)), columns=['a'])
df['dates'] = pd.date_range('20010101', periods=len(df))
df2 = df.copy()
df2['dates'] = df['a']
check(df, df2)
df = pd.DataFrame(np.random.randint(10, size=(10, 2)),
columns=['a', 'b'])
df2 = pd.DataFrame({'a': pd.date_range('20010101', periods=len(df)),
'b': pd.date_range('20100101', periods=len(df))})
check(df, df2)
def test_timestamp_compare(self):
# make sure we can compare Timestamps on the right AND left hand side
# GH#4982
df = pd. DataFrame({'dates1': pd.date_range('20010101', periods=10),
'dates2': pd.date_range('20010102', periods=10),
'intcol': np.random.randint(1000000000, size=10),
'floatcol': np.random.randn(10),
'stringcol': list(tm.rands(10))})
df.loc[np.random.rand(len(df)) > 0.5, 'dates2'] = pd.NaT
ops = {'gt': 'lt', 'lt': 'gt', 'ge': 'le', 'le': 'ge', 'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
# no nats
if left in ['eq', 'ne']:
expected = left_f(df, pd.Timestamp('20010109'))
result = right_f(pd.Timestamp('20010109'), df)
tm.assert_frame_equal(result, expected)
else:
with pytest.raises(TypeError):
left_f(df, pd.Timestamp('20010109'))
with pytest.raises(TypeError):
right_f(pd.Timestamp('20010109'), df)
# nats
expected = left_f(df, pd.Timestamp('nat'))
result = right_f(pd.Timestamp('nat'), df)
tm.assert_frame_equal(result, expected)
def test_mixed_comparison(self):
# GH#13128, GH#22163 != datetime64 vs non-dt64 should be False,
# not raise TypeError
# (this appears to be fixed before GH#22163, not sure when)
df = pd.DataFrame([['1989-08-01', 1], ['1989-08-01', 2]])
other = pd.DataFrame([['a', 'b'], ['c', 'd']])
result = df == other
assert not result.any().any()
result = df != other
assert result.all().all()
def test_df_boolean_comparison_error(self):
# GH#4576, GH#22880
# comparing DataFrame against list/tuple with len(obj) matching
# len(df.columns) is supported as of GH#22800
df = pd.DataFrame(np.arange(6).reshape((3, 2)))
expected = pd.DataFrame([[False, False],
[True, False],
[False, False]])
result = df == (2, 2)
tm.assert_frame_equal(result, expected)
result = df == [2, 2]
tm.assert_frame_equal(result, expected)
def test_df_float_none_comparison(self):
df = pd.DataFrame(np.random.randn(8, 3), index=range(8),
columns=['A', 'B', 'C'])
result = df.__eq__(None)
assert not result.any().any()
def test_df_string_comparison(self):
df = pd.DataFrame([{"a": 1, "b": "foo"}, {"a": 2, "b": "bar"}])
mask_a = df.a > 1
tm.assert_frame_equal(df[mask_a], df.loc[1:1, :])
tm.assert_frame_equal(df[-mask_a], df.loc[0:0, :])
mask_b = df.b == "foo"
tm.assert_frame_equal(df[mask_b], df.loc[0:0, :])
tm.assert_frame_equal(df[-mask_b], df.loc[1:1, :])
class TestFrameFlexComparisons(object):
# TODO: test_bool_flex_frame needs a better name
def test_bool_flex_frame(self):
data = np.random.randn(5, 3)
other_data = np.random.randn(5, 3)
df = pd.DataFrame(data)
other = pd.DataFrame(other_data)
ndim_5 = np.ones(df.shape + (1, 3))
# Unaligned
def _check_unaligned_frame(meth, op, df, other):
part_o = other.loc[3:, 1:].copy()
rs = meth(part_o)
xp = op(df, part_o.reindex(index=df.index, columns=df.columns))
tm.assert_frame_equal(rs, xp)
# DataFrame
assert df.eq(df).values.all()
assert not df.ne(df).values.any()
for op in ['eq', 'ne', 'gt', 'lt', 'ge', 'le']:
f = getattr(df, op)
o = getattr(operator, op)
# No NAs
tm.assert_frame_equal(f(other), o(df, other))
_check_unaligned_frame(f, o, df, other)
# ndarray
tm.assert_frame_equal(f(other.values), o(df, other.values))
# scalar
tm.assert_frame_equal(f(0), o(df, 0))
# NAs
msg = "Unable to coerce to Series/DataFrame"
tm.assert_frame_equal(f(np.nan), o(df, np.nan))
with pytest.raises(ValueError, match=msg):
f(ndim_5)
# Series
def _test_seq(df, idx_ser, col_ser):
idx_eq = df.eq(idx_ser, axis=0)
col_eq = df.eq(col_ser)
idx_ne = df.ne(idx_ser, axis=0)
col_ne = df.ne(col_ser)
tm.assert_frame_equal(col_eq, df == pd.Series(col_ser))
tm.assert_frame_equal(col_eq, -col_ne)
tm.assert_frame_equal(idx_eq, -idx_ne)
tm.assert_frame_equal(idx_eq, df.T.eq(idx_ser).T)
tm.assert_frame_equal(col_eq, df.eq(list(col_ser)))
tm.assert_frame_equal(idx_eq, df.eq(pd.Series(idx_ser), axis=0))
tm.assert_frame_equal(idx_eq, df.eq(list(idx_ser), axis=0))
idx_gt = df.gt(idx_ser, axis=0)
col_gt = df.gt(col_ser)
idx_le = df.le(idx_ser, axis=0)
col_le = df.le(col_ser)
tm.assert_frame_equal(col_gt, df > pd.Series(col_ser))
tm.assert_frame_equal(col_gt, -col_le)
tm.assert_frame_equal(idx_gt, -idx_le)
tm.assert_frame_equal(idx_gt, df.T.gt(idx_ser).T)
idx_ge = df.ge(idx_ser, axis=0)
col_ge = df.ge(col_ser)
idx_lt = df.lt(idx_ser, axis=0)
col_lt = df.lt(col_ser)
tm.assert_frame_equal(col_ge, df >= pd.Series(col_ser))
tm.assert_frame_equal(col_ge, -col_lt)
tm.assert_frame_equal(idx_ge, -idx_lt)
tm.assert_frame_equal(idx_ge, df.T.ge(idx_ser).T)
idx_ser = pd.Series(np.random.randn(5))
col_ser = pd.Series(np.random.randn(3))
_test_seq(df, idx_ser, col_ser)
# list/tuple
_test_seq(df, idx_ser.values, col_ser.values)
# NA
df.loc[0, 0] = np.nan
rs = df.eq(df)
assert not rs.loc[0, 0]
rs = df.ne(df)
assert rs.loc[0, 0]
rs = df.gt(df)
assert not rs.loc[0, 0]
rs = df.lt(df)
assert not rs.loc[0, 0]
rs = df.ge(df)
assert not rs.loc[0, 0]
rs = df.le(df)
assert not rs.loc[0, 0]
# complex
arr = np.array([np.nan, 1, 6, np.nan])
arr2 = np.array([2j, np.nan, 7, None])
df = pd.DataFrame({'a': arr})
df2 = pd.DataFrame({'a': arr2})
rs = df.gt(df2)
assert not rs.values.any()
rs = df.ne(df2)
assert rs.values.all()
arr3 = np.array([2j, np.nan, None])
df3 = pd.DataFrame({'a': arr3})
rs = df3.gt(2j)
assert not rs.values.any()
# corner, dtype=object
df1 = pd.DataFrame({'col': ['foo', np.nan, 'bar']})
df2 = pd.DataFrame({'col': ['foo', datetime.now(), 'bar']})
result = df1.ne(df2)
exp = pd.DataFrame({'col': [False, True, False]})
tm.assert_frame_equal(result, exp)
def test_flex_comparison_nat(self):
# GH 15697, GH 22163 df.eq(pd.NaT) should behave like df == pd.NaT,
# and _definitely_ not be NaN
df = pd.DataFrame([pd.NaT])
result = df == pd.NaT
# result.iloc[0, 0] is a np.bool_ object
assert result.iloc[0, 0].item() is False
result = df.eq(pd.NaT)
assert result.iloc[0, 0].item() is False
result = df != pd.NaT
assert result.iloc[0, 0].item() is True
result = df.ne(pd.NaT)
assert result.iloc[0, 0].item() is True
@pytest.mark.parametrize('opname', ['eq', 'ne', 'gt', 'lt', 'ge', 'le'])
def test_df_flex_cmp_constant_return_types(self, opname):
# GH 15077, non-empty DataFrame
df = pd.DataFrame({'x': [1, 2, 3], 'y': [1., 2., 3.]})
const = 2
result = getattr(df, opname)(const).get_dtype_counts()
tm.assert_series_equal(result, pd.Series([2], ['bool']))
@pytest.mark.parametrize('opname', ['eq', 'ne', 'gt', 'lt', 'ge', 'le'])
def test_df_flex_cmp_constant_return_types_empty(self, opname):
# GH 15077 empty DataFrame
df = pd.DataFrame({'x': [1, 2, 3], 'y': [1., 2., 3.]})
const = 2
empty = df.iloc[:0]
result = getattr(empty, opname)(const).get_dtype_counts()
tm.assert_series_equal(result, pd.Series([2], ['bool']))
# -------------------------------------------------------------------
# Arithmetic
class TestFrameFlexArithmetic(object):
def test_df_add_td64_columnwise(self):
# GH 22534 Check that column-wise addition broadcasts correctly
dti = pd.date_range('2016-01-01', periods=10)
tdi = pd.timedelta_range('1', periods=10)
tser = pd.Series(tdi)
df = pd.DataFrame({0: dti, 1: tdi})
result = df.add(tser, axis=0)
expected = pd.DataFrame({0: dti + tdi,
1: tdi + tdi})
tm.assert_frame_equal(result, expected)
def test_df_add_flex_filled_mixed_dtypes(self):
# GH 19611
dti = pd.date_range('2016-01-01', periods=3)
ser = pd.Series(['1 Day', 'NaT', '2 Days'], dtype='timedelta64[ns]')
df = pd.DataFrame({'A': dti, 'B': ser})
other = pd.DataFrame({'A': ser, 'B': ser})
fill = pd.Timedelta(days=1).to_timedelta64()
result = df.add(other, fill_value=fill)
expected = pd.DataFrame(
{'A': pd.Series(['2016-01-02', '2016-01-03', '2016-01-05'],
dtype='datetime64[ns]'),
'B': ser * 2})
tm.assert_frame_equal(result, expected)
def test_arith_flex_frame(self, all_arithmetic_operators, float_frame,
mixed_float_frame):
# one instance of parametrized fixture
op = all_arithmetic_operators
def f(x, y):
# r-versions not in operator-stdlib; get op without "r" and invert
if op.startswith('__r'):
return getattr(operator, op.replace('__r', '__'))(y, x)
return getattr(operator, op)(x, y)
result = getattr(float_frame, op)(2 * float_frame)
expected = f(float_frame, 2 * float_frame)
tm.assert_frame_equal(result, expected)
# vs mix float
result = getattr(mixed_float_frame, op)(2 * mixed_float_frame)
expected = f(mixed_float_frame, 2 * mixed_float_frame)
tm.assert_frame_equal(result, expected)
_check_mixed_float(result, dtype=dict(C=None))
Loading ...