# -*- coding: utf-8 -*-
from datetime import datetime
from itertools import permutations
import struct
import numpy as np
from numpy import nan
from numpy.random import RandomState
import pytest
from pandas._libs import (
algos as libalgos, groupby as libgroupby, hashtable as ht)
from pandas.compat import lrange, range
from pandas.compat.numpy import np_array_datetime64_compat
import pandas.util._test_decorators as td
from pandas.core.dtypes.dtypes import CategoricalDtype as CDT
import pandas as pd
from pandas import (
Categorical, CategoricalIndex, DatetimeIndex, Index, IntervalIndex, Series,
Timestamp, compat)
import pandas.core.algorithms as algos
from pandas.core.arrays import DatetimeArray
import pandas.core.common as com
import pandas.util.testing as tm
from pandas.util.testing import assert_almost_equal
class TestMatch(object):
def test_ints(self):
values = np.array([0, 2, 1])
to_match = np.array([0, 1, 2, 2, 0, 1, 3, 0])
result = algos.match(to_match, values)
expected = np.array([0, 2, 1, 1, 0, 2, -1, 0], dtype=np.int64)
tm.assert_numpy_array_equal(result, expected)
result = Series(algos.match(to_match, values, np.nan))
expected = Series(np.array([0, 2, 1, 1, 0, 2, np.nan, 0]))
tm.assert_series_equal(result, expected)
s = Series(np.arange(5), dtype=np.float32)
result = algos.match(s, [2, 4])
expected = np.array([-1, -1, 0, -1, 1], dtype=np.int64)
tm.assert_numpy_array_equal(result, expected)
result = Series(algos.match(s, [2, 4], np.nan))
expected = Series(np.array([np.nan, np.nan, 0, np.nan, 1]))
tm.assert_series_equal(result, expected)
def test_strings(self):
values = ['foo', 'bar', 'baz']
to_match = ['bar', 'foo', 'qux', 'foo', 'bar', 'baz', 'qux']
result = algos.match(to_match, values)
expected = np.array([1, 0, -1, 0, 1, 2, -1], dtype=np.int64)
tm.assert_numpy_array_equal(result, expected)
result = Series(algos.match(to_match, values, np.nan))
expected = Series(np.array([1, 0, np.nan, 0, 1, 2, np.nan]))
tm.assert_series_equal(result, expected)
class TestFactorize(object):
def test_basic(self):
labels, uniques = algos.factorize(['a', 'b', 'b', 'a', 'a', 'c', 'c',
'c'])
tm.assert_numpy_array_equal(
uniques, np.array(['a', 'b', 'c'], dtype=object))
labels, uniques = algos.factorize(['a', 'b', 'b', 'a',
'a', 'c', 'c', 'c'], sort=True)
exp = np.array([0, 1, 1, 0, 0, 2, 2, 2], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = np.array(['a', 'b', 'c'], dtype=object)
tm.assert_numpy_array_equal(uniques, exp)
labels, uniques = algos.factorize(list(reversed(range(5))))
exp = np.array([0, 1, 2, 3, 4], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = np.array([4, 3, 2, 1, 0], dtype=np.int64)
tm.assert_numpy_array_equal(uniques, exp)
labels, uniques = algos.factorize(list(reversed(range(5))), sort=True)
exp = np.array([4, 3, 2, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = np.array([0, 1, 2, 3, 4], dtype=np.int64)
tm.assert_numpy_array_equal(uniques, exp)
labels, uniques = algos.factorize(list(reversed(np.arange(5.))))
exp = np.array([0, 1, 2, 3, 4], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = np.array([4., 3., 2., 1., 0.], dtype=np.float64)
tm.assert_numpy_array_equal(uniques, exp)
labels, uniques = algos.factorize(list(reversed(np.arange(5.))),
sort=True)
exp = np.array([4, 3, 2, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = np.array([0., 1., 2., 3., 4.], dtype=np.float64)
tm.assert_numpy_array_equal(uniques, exp)
def test_mixed(self):
# doc example reshaping.rst
x = Series(['A', 'A', np.nan, 'B', 3.14, np.inf])
labels, uniques = algos.factorize(x)
exp = np.array([0, 0, -1, 1, 2, 3], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = Index(['A', 'B', 3.14, np.inf])
tm.assert_index_equal(uniques, exp)
labels, uniques = algos.factorize(x, sort=True)
exp = np.array([2, 2, -1, 3, 0, 1], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = Index([3.14, np.inf, 'A', 'B'])
tm.assert_index_equal(uniques, exp)
def test_datelike(self):
# M8
v1 = Timestamp('20130101 09:00:00.00004')
v2 = Timestamp('20130101')
x = Series([v1, v1, v1, v2, v2, v1])
labels, uniques = algos.factorize(x)
exp = np.array([0, 0, 0, 1, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = DatetimeIndex([v1, v2])
tm.assert_index_equal(uniques, exp)
labels, uniques = algos.factorize(x, sort=True)
exp = np.array([1, 1, 1, 0, 0, 1], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
exp = DatetimeIndex([v2, v1])
tm.assert_index_equal(uniques, exp)
# period
v1 = pd.Period('201302', freq='M')
v2 = pd.Period('201303', freq='M')
x = Series([v1, v1, v1, v2, v2, v1])
# periods are not 'sorted' as they are converted back into an index
labels, uniques = algos.factorize(x)
exp = np.array([0, 0, 0, 1, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
tm.assert_index_equal(uniques, pd.PeriodIndex([v1, v2]))
labels, uniques = algos.factorize(x, sort=True)
exp = np.array([0, 0, 0, 1, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
tm.assert_index_equal(uniques, pd.PeriodIndex([v1, v2]))
# GH 5986
v1 = pd.to_timedelta('1 day 1 min')
v2 = pd.to_timedelta('1 day')
x = Series([v1, v2, v1, v1, v2, v2, v1])
labels, uniques = algos.factorize(x)
exp = np.array([0, 1, 0, 0, 1, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
tm.assert_index_equal(uniques, pd.to_timedelta([v1, v2]))
labels, uniques = algos.factorize(x, sort=True)
exp = np.array([1, 0, 1, 1, 0, 0, 1], dtype=np.intp)
tm.assert_numpy_array_equal(labels, exp)
tm.assert_index_equal(uniques, pd.to_timedelta([v2, v1]))
def test_factorize_nan(self):
# nan should map to na_sentinel, not reverse_indexer[na_sentinel]
# rizer.factorize should not raise an exception if na_sentinel indexes
# outside of reverse_indexer
key = np.array([1, 2, 1, np.nan], dtype='O')
rizer = ht.Factorizer(len(key))
for na_sentinel in (-1, 20):
ids = rizer.factorize(key, sort=True, na_sentinel=na_sentinel)
expected = np.array([0, 1, 0, na_sentinel], dtype='int32')
assert len(set(key)) == len(set(expected))
tm.assert_numpy_array_equal(pd.isna(key),
expected == na_sentinel)
# nan still maps to na_sentinel when sort=False
key = np.array([0, np.nan, 1], dtype='O')
na_sentinel = -1
# TODO(wesm): unused?
ids = rizer.factorize(key, sort=False, na_sentinel=na_sentinel) # noqa
expected = np.array([2, -1, 0], dtype='int32')
assert len(set(key)) == len(set(expected))
tm.assert_numpy_array_equal(pd.isna(key), expected == na_sentinel)
@pytest.mark.parametrize("data,expected_label,expected_level", [
(
[(1, 1), (1, 2), (0, 0), (1, 2), 'nonsense'],
[0, 1, 2, 1, 3],
[(1, 1), (1, 2), (0, 0), 'nonsense']
),
(
[(1, 1), (1, 2), (0, 0), (1, 2), (1, 2, 3)],
[0, 1, 2, 1, 3],
[(1, 1), (1, 2), (0, 0), (1, 2, 3)]
),
(
[(1, 1), (1, 2), (0, 0), (1, 2)],
[0, 1, 2, 1],
[(1, 1), (1, 2), (0, 0)]
)
])
def test_factorize_tuple_list(self, data, expected_label, expected_level):
# GH9454
result = pd.factorize(data)
tm.assert_numpy_array_equal(result[0],
np.array(expected_label, dtype=np.intp))
expected_level_array = com.asarray_tuplesafe(expected_level,
dtype=object)
tm.assert_numpy_array_equal(result[1], expected_level_array)
def test_complex_sorting(self):
# gh 12666 - check no segfault
x17 = np.array([complex(i) for i in range(17)], dtype=object)
pytest.raises(TypeError, algos.factorize, x17[::-1], sort=True)
def test_float64_factorize(self, writable):
data = np.array([1.0, 1e8, 1.0, 1e-8, 1e8, 1.0], dtype=np.float64)
data.setflags(write=writable)
exp_labels = np.array([0, 1, 0, 2, 1, 0], dtype=np.intp)
exp_uniques = np.array([1.0, 1e8, 1e-8], dtype=np.float64)
labels, uniques = algos.factorize(data)
tm.assert_numpy_array_equal(labels, exp_labels)
tm.assert_numpy_array_equal(uniques, exp_uniques)
def test_uint64_factorize(self, writable):
data = np.array([2**64 - 1, 1, 2**64 - 1], dtype=np.uint64)
data.setflags(write=writable)
exp_labels = np.array([0, 1, 0], dtype=np.intp)
exp_uniques = np.array([2**64 - 1, 1], dtype=np.uint64)
labels, uniques = algos.factorize(data)
tm.assert_numpy_array_equal(labels, exp_labels)
tm.assert_numpy_array_equal(uniques, exp_uniques)
def test_int64_factorize(self, writable):
data = np.array([2**63 - 1, -2**63, 2**63 - 1], dtype=np.int64)
data.setflags(write=writable)
exp_labels = np.array([0, 1, 0], dtype=np.intp)
exp_uniques = np.array([2**63 - 1, -2**63], dtype=np.int64)
labels, uniques = algos.factorize(data)
tm.assert_numpy_array_equal(labels, exp_labels)
tm.assert_numpy_array_equal(uniques, exp_uniques)
def test_string_factorize(self, writable):
data = np.array(['a', 'c', 'a', 'b', 'c'],
dtype=object)
data.setflags(write=writable)
exp_labels = np.array([0, 1, 0, 2, 1], dtype=np.intp)
exp_uniques = np.array(['a', 'c', 'b'], dtype=object)
labels, uniques = algos.factorize(data)
tm.assert_numpy_array_equal(labels, exp_labels)
tm.assert_numpy_array_equal(uniques, exp_uniques)
def test_object_factorize(self, writable):
data = np.array(['a', 'c', None, np.nan, 'a', 'b', pd.NaT, 'c'],
dtype=object)
data.setflags(write=writable)
exp_labels = np.array([0, 1, -1, -1, 0, 2, -1, 1], dtype=np.intp)
exp_uniques = np.array(['a', 'c', 'b'], dtype=object)
labels, uniques = algos.factorize(data)
tm.assert_numpy_array_equal(labels, exp_labels)
tm.assert_numpy_array_equal(uniques, exp_uniques)
def test_deprecate_order(self):
# gh 19727 - check warning is raised for deprecated keyword, order.
# Test not valid once order keyword is removed.
data = np.array([2**63, 1, 2**63], dtype=np.uint64)
with tm.assert_produces_warning(expected_warning=FutureWarning):
algos.factorize(data, order=True)
with tm.assert_produces_warning(False):
algos.factorize(data)
@pytest.mark.parametrize('data', [
np.array([0, 1, 0], dtype='u8'),
np.array([-2**63, 1, -2**63], dtype='i8'),
np.array(['__nan__', 'foo', '__nan__'], dtype='object'),
])
def test_parametrized_factorize_na_value_default(self, data):
# arrays that include the NA default for that type, but isn't used.
l, u = algos.factorize(data)
expected_uniques = data[[0, 1]]
expected_labels = np.array([0, 1, 0], dtype=np.intp)
tm.assert_numpy_array_equal(l, expected_labels)
tm.assert_numpy_array_equal(u, expected_uniques)
@pytest.mark.parametrize('data, na_value', [
(np.array([0, 1, 0, 2], dtype='u8'), 0),
(np.array([1, 0, 1, 2], dtype='u8'), 1),
(np.array([-2**63, 1, -2**63, 0], dtype='i8'), -2**63),
(np.array([1, -2**63, 1, 0], dtype='i8'), 1),
(np.array(['a', '', 'a', 'b'], dtype=object), 'a'),
(np.array([(), ('a', 1), (), ('a', 2)], dtype=object), ()),
(np.array([('a', 1), (), ('a', 1), ('a', 2)], dtype=object),
('a', 1)),
])
def test_parametrized_factorize_na_value(self, data, na_value):
l, u = algos._factorize_array(data, na_value=na_value)
expected_uniques = data[[1, 3]]
expected_labels = np.array([-1, 0, -1, 1], dtype=np.intp)
tm.assert_numpy_array_equal(l, expected_labels)
tm.assert_numpy_array_equal(u, expected_uniques)
@pytest.mark.parametrize('sort', [True, False])
@pytest.mark.parametrize('na_sentinel', [-1, -10, 100])
def test_factorize_na_sentinel(self, sort, na_sentinel):
data = np.array(['b', 'a', None, 'b'], dtype=object)
labels, uniques = algos.factorize(data, sort=sort,
na_sentinel=na_sentinel)
if sort:
expected_labels = np.array([1, 0, na_sentinel, 1], dtype=np.intp)
expected_uniques = np.array(['a', 'b'], dtype=object)
else:
expected_labels = np.array([0, 1, na_sentinel, 0], dtype=np.intp)
expected_uniques = np.array(['b', 'a'], dtype=object)
tm.assert_numpy_array_equal(labels, expected_labels)
tm.assert_numpy_array_equal(uniques, expected_uniques)
class TestUnique(object):
def test_ints(self):
arr = np.random.randint(0, 100, size=50)
result = algos.unique(arr)
Loading ...