# coding=utf-8
# pylint: disable-msg=E1101,W0612
from datetime import datetime
import numpy as np
from numpy import nan
import pytest
import pandas as pd
from pandas import DataFrame, DatetimeIndex, Series, compat, date_range
import pandas.util.testing as tm
from pandas.util.testing import assert_frame_equal, assert_series_equal
class TestSeriesCombine(object):
def test_append(self, datetime_series, string_series, object_series):
appendedSeries = string_series.append(object_series)
for idx, value in compat.iteritems(appendedSeries):
if idx in string_series.index:
assert value == string_series[idx]
elif idx in object_series.index:
assert value == object_series[idx]
else:
raise AssertionError("orphaned index!")
msg = "Indexes have overlapping values:"
with pytest.raises(ValueError, match=msg):
datetime_series.append(datetime_series, verify_integrity=True)
def test_append_many(self, datetime_series):
pieces = [datetime_series[:5], datetime_series[5:10],
datetime_series[10:]]
result = pieces[0].append(pieces[1:])
assert_series_equal(result, datetime_series)
def test_append_duplicates(self):
# GH 13677
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([4, 5, 6])
exp = pd.Series([1, 2, 3, 4, 5, 6], index=[0, 1, 2, 0, 1, 2])
tm.assert_series_equal(s1.append(s2), exp)
tm.assert_series_equal(pd.concat([s1, s2]), exp)
# the result must have RangeIndex
exp = pd.Series([1, 2, 3, 4, 5, 6])
tm.assert_series_equal(s1.append(s2, ignore_index=True),
exp, check_index_type=True)
tm.assert_series_equal(pd.concat([s1, s2], ignore_index=True),
exp, check_index_type=True)
msg = 'Indexes have overlapping values:'
with pytest.raises(ValueError, match=msg):
s1.append(s2, verify_integrity=True)
with pytest.raises(ValueError, match=msg):
pd.concat([s1, s2], verify_integrity=True)
def test_combine_scalar(self):
# GH 21248
# Note - combine() with another Series is tested elsewhere because
# it is used when testing operators
s = pd.Series([i * 10 for i in range(5)])
result = s.combine(3, lambda x, y: x + y)
expected = pd.Series([i * 10 + 3 for i in range(5)])
tm.assert_series_equal(result, expected)
result = s.combine(22, lambda x, y: min(x, y))
expected = pd.Series([min(i * 10, 22) for i in range(5)])
tm.assert_series_equal(result, expected)
def test_combine_first(self):
values = tm.makeIntIndex(20).values.astype(float)
series = Series(values, index=tm.makeIntIndex(20))
series_copy = series * 2
series_copy[::2] = np.NaN
# nothing used from the input
combined = series.combine_first(series_copy)
tm.assert_series_equal(combined, series)
# Holes filled from input
combined = series_copy.combine_first(series)
assert np.isfinite(combined).all()
tm.assert_series_equal(combined[::2], series[::2])
tm.assert_series_equal(combined[1::2], series_copy[1::2])
# mixed types
index = tm.makeStringIndex(20)
floats = Series(tm.randn(20), index=index)
strings = Series(tm.makeStringIndex(10), index=index[::2])
combined = strings.combine_first(floats)
tm.assert_series_equal(strings, combined.loc[index[::2]])
tm.assert_series_equal(floats[1::2].astype(object),
combined.loc[index[1::2]])
# corner case
s = Series([1., 2, 3], index=[0, 1, 2])
result = s.combine_first(Series([], index=[]))
assert_series_equal(s, result)
def test_update(self):
s = Series([1.5, nan, 3., 4., nan])
s2 = Series([nan, 3.5, nan, 5.])
s.update(s2)
expected = Series([1.5, 3.5, 3., 5., np.nan])
assert_series_equal(s, expected)
# GH 3217
df = DataFrame([{"a": 1}, {"a": 3, "b": 2}])
df['c'] = np.nan
df['c'].update(Series(['foo'], index=[0]))
expected = DataFrame([[1, np.nan, 'foo'], [3, 2., np.nan]],
columns=['a', 'b', 'c'])
assert_frame_equal(df, expected)
@pytest.mark.parametrize('other, dtype, expected', [
# other is int
([61, 63], 'int32', pd.Series([10, 61, 12], dtype='int32')),
([61, 63], 'int64', pd.Series([10, 61, 12])),
([61, 63], float, pd.Series([10., 61., 12.])),
([61, 63], object, pd.Series([10, 61, 12], dtype=object)),
# other is float, but can be cast to int
([61., 63.], 'int32', pd.Series([10, 61, 12], dtype='int32')),
([61., 63.], 'int64', pd.Series([10, 61, 12])),
([61., 63.], float, pd.Series([10., 61., 12.])),
([61., 63.], object, pd.Series([10, 61., 12], dtype=object)),
# others is float, cannot be cast to int
([61.1, 63.1], 'int32', pd.Series([10., 61.1, 12.])),
([61.1, 63.1], 'int64', pd.Series([10., 61.1, 12.])),
([61.1, 63.1], float, pd.Series([10., 61.1, 12.])),
([61.1, 63.1], object, pd.Series([10, 61.1, 12], dtype=object)),
# other is object, cannot be cast
([(61,), (63,)], 'int32', pd.Series([10, (61,), 12])),
([(61,), (63,)], 'int64', pd.Series([10, (61,), 12])),
([(61,), (63,)], float, pd.Series([10., (61,), 12.])),
([(61,), (63,)], object, pd.Series([10, (61,), 12]))
])
def test_update_dtypes(self, other, dtype, expected):
s = Series([10, 11, 12], dtype=dtype)
other = Series(other, index=[1, 3])
s.update(other)
assert_series_equal(s, expected)
def test_concat_empty_series_dtypes_roundtrips(self):
# round-tripping with self & like self
dtypes = map(np.dtype, ['float64', 'int8', 'uint8', 'bool', 'm8[ns]',
'M8[ns]'])
for dtype in dtypes:
assert pd.concat([Series(dtype=dtype)]).dtype == dtype
assert pd.concat([Series(dtype=dtype),
Series(dtype=dtype)]).dtype == dtype
def int_result_type(dtype, dtype2):
typs = {dtype.kind, dtype2.kind}
if not len(typs - {'i', 'u', 'b'}) and (dtype.kind == 'i' or
dtype2.kind == 'i'):
return 'i'
elif not len(typs - {'u', 'b'}) and (dtype.kind == 'u' or
dtype2.kind == 'u'):
return 'u'
return None
def float_result_type(dtype, dtype2):
typs = {dtype.kind, dtype2.kind}
if not len(typs - {'f', 'i', 'u'}) and (dtype.kind == 'f' or
dtype2.kind == 'f'):
return 'f'
return None
def get_result_type(dtype, dtype2):
result = float_result_type(dtype, dtype2)
if result is not None:
return result
result = int_result_type(dtype, dtype2)
if result is not None:
return result
return 'O'
for dtype in dtypes:
for dtype2 in dtypes:
if dtype == dtype2:
continue
expected = get_result_type(dtype, dtype2)
result = pd.concat([Series(dtype=dtype), Series(dtype=dtype2)
]).dtype
assert result.kind == expected
def test_combine_first_dt_tz_values(self, tz_naive_fixture):
ser1 = pd.Series(pd.DatetimeIndex(['20150101', '20150102', '20150103'],
tz=tz_naive_fixture),
name='ser1')
ser2 = pd.Series(pd.DatetimeIndex(['20160514', '20160515', '20160516'],
tz=tz_naive_fixture),
index=[2, 3, 4], name='ser2')
result = ser1.combine_first(ser2)
exp_vals = pd.DatetimeIndex(['20150101', '20150102', '20150103',
'20160515', '20160516'],
tz=tz_naive_fixture)
exp = pd.Series(exp_vals, name='ser1')
assert_series_equal(exp, result)
def test_concat_empty_series_dtypes(self):
# booleans
assert pd.concat([Series(dtype=np.bool_),
Series(dtype=np.int32)]).dtype == np.int32
assert pd.concat([Series(dtype=np.bool_),
Series(dtype=np.float32)]).dtype == np.object_
# datetime-like
assert pd.concat([Series(dtype='m8[ns]'),
Series(dtype=np.bool)]).dtype == np.object_
assert pd.concat([Series(dtype='m8[ns]'),
Series(dtype=np.int64)]).dtype == np.object_
assert pd.concat([Series(dtype='M8[ns]'),
Series(dtype=np.bool)]).dtype == np.object_
assert pd.concat([Series(dtype='M8[ns]'),
Series(dtype=np.int64)]).dtype == np.object_
assert pd.concat([Series(dtype='M8[ns]'),
Series(dtype=np.bool_),
Series(dtype=np.int64)]).dtype == np.object_
# categorical
assert pd.concat([Series(dtype='category'),
Series(dtype='category')]).dtype == 'category'
# GH 18515
assert pd.concat([Series(np.array([]), dtype='category'),
Series(dtype='float64')]).dtype == 'float64'
assert pd.concat([Series(dtype='category'),
Series(dtype='object')]).dtype == 'object'
# sparse
# TODO: move?
result = pd.concat([Series(dtype='float64').to_sparse(), Series(
dtype='float64').to_sparse()])
assert result.dtype == 'Sparse[float64]'
assert result.ftype == 'float64:sparse'
result = pd.concat([Series(dtype='float64').to_sparse(), Series(
dtype='float64')])
# TODO: release-note: concat sparse dtype
expected = pd.core.sparse.api.SparseDtype(np.float64)
assert result.dtype == expected
assert result.ftype == 'float64:sparse'
result = pd.concat([Series(dtype='float64').to_sparse(), Series(
dtype='object')])
# TODO: release-note: concat sparse dtype
expected = pd.core.sparse.api.SparseDtype('object')
assert result.dtype == expected
assert result.ftype == 'object:sparse'
def test_combine_first_dt64(self):
from pandas.core.tools.datetimes import to_datetime
s0 = to_datetime(Series(["2010", np.NaN]))
s1 = to_datetime(Series([np.NaN, "2011"]))
rs = s0.combine_first(s1)
xp = to_datetime(Series(['2010', '2011']))
assert_series_equal(rs, xp)
s0 = to_datetime(Series(["2010", np.NaN]))
s1 = Series([np.NaN, "2011"])
rs = s0.combine_first(s1)
xp = Series([datetime(2010, 1, 1), '2011'])
assert_series_equal(rs, xp)
class TestTimeseries(object):
def test_append_concat(self):
rng = date_range('5/8/2012 1:45', periods=10, freq='5T')
ts = Series(np.random.randn(len(rng)), rng)
df = DataFrame(np.random.randn(len(rng), 4), index=rng)
result = ts.append(ts)
result_df = df.append(df)
ex_index = DatetimeIndex(np.tile(rng.values, 2))
tm.assert_index_equal(result.index, ex_index)
tm.assert_index_equal(result_df.index, ex_index)
appended = rng.append(rng)
tm.assert_index_equal(appended, ex_index)
appended = rng.append([rng, rng])
ex_index = DatetimeIndex(np.tile(rng.values, 3))
tm.assert_index_equal(appended, ex_index)
# different index names
rng1 = rng.copy()
rng2 = rng.copy()
rng1.name = 'foo'
rng2.name = 'bar'
assert rng1.append(rng1).name == 'foo'
assert rng1.append(rng2).name is None
def test_append_concat_tz(self):
# see gh-2938
rng = date_range('5/8/2012 1:45', periods=10, freq='5T',
tz='US/Eastern')
rng2 = date_range('5/8/2012 2:35', periods=10, freq='5T',
tz='US/Eastern')
rng3 = date_range('5/8/2012 1:45', periods=20, freq='5T',
tz='US/Eastern')
ts = Series(np.random.randn(len(rng)), rng)
df = DataFrame(np.random.randn(len(rng), 4), index=rng)
ts2 = Series(np.random.randn(len(rng2)), rng2)
df2 = DataFrame(np.random.randn(len(rng2), 4), index=rng2)
result = ts.append(ts2)
result_df = df.append(df2)
tm.assert_index_equal(result.index, rng3)
tm.assert_index_equal(result_df.index, rng3)
appended = rng.append(rng2)
tm.assert_index_equal(appended, rng3)
def test_append_concat_tz_explicit_pytz(self):
# see gh-2938
from pytz import timezone as timezone
rng = date_range('5/8/2012 1:45', periods=10, freq='5T',
tz=timezone('US/Eastern'))
rng2 = date_range('5/8/2012 2:35', periods=10, freq='5T',
tz=timezone('US/Eastern'))
rng3 = date_range('5/8/2012 1:45', periods=20, freq='5T',
tz=timezone('US/Eastern'))
ts = Series(np.random.randn(len(rng)), rng)
df = DataFrame(np.random.randn(len(rng), 4), index=rng)
ts2 = Series(np.random.randn(len(rng2)), rng2)
df2 = DataFrame(np.random.randn(len(rng2), 4), index=rng2)
Loading ...