#
# Author: Damian Eads
# Date: April 17, 2008
#
# Copyright (C) 2008 Damian Eads
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# 3. The name of the author may not be used to endorse or promote
# products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import division, print_function, absolute_import
import os.path
from functools import wraps, partial
from scipy._lib.six import xrange, u
import numpy as np
import warnings
from numpy.linalg import norm
from numpy.testing import (verbose, assert_,
assert_array_equal, assert_equal,
assert_almost_equal, assert_allclose)
import pytest
from pytest import raises as assert_raises
from scipy._lib._numpy_compat import suppress_warnings
from scipy.spatial.distance import (squareform, pdist, cdist, num_obs_y,
num_obs_dm, is_valid_dm, is_valid_y,
_validate_vector, _METRICS_NAMES)
# these were missing: chebyshev cityblock kulsinski
from scipy.spatial.distance import (braycurtis, canberra, chebyshev, cityblock,
correlation, cosine, dice, euclidean,
hamming, jaccard, jensenshannon,
kulsinski, mahalanobis, matching,
minkowski, rogerstanimoto, russellrao,
seuclidean, sokalmichener, sokalsneath,
sqeuclidean, yule)
from scipy.spatial.distance import wminkowski as old_wminkowski
_filenames = [
"cdist-X1.txt",
"cdist-X2.txt",
"iris.txt",
"pdist-boolean-inp.txt",
"pdist-chebyshev-ml-iris.txt",
"pdist-chebyshev-ml.txt",
"pdist-cityblock-ml-iris.txt",
"pdist-cityblock-ml.txt",
"pdist-correlation-ml-iris.txt",
"pdist-correlation-ml.txt",
"pdist-cosine-ml-iris.txt",
"pdist-cosine-ml.txt",
"pdist-double-inp.txt",
"pdist-euclidean-ml-iris.txt",
"pdist-euclidean-ml.txt",
"pdist-hamming-ml.txt",
"pdist-jaccard-ml.txt",
"pdist-jensenshannon-ml-iris.txt",
"pdist-jensenshannon-ml.txt",
"pdist-minkowski-3.2-ml-iris.txt",
"pdist-minkowski-3.2-ml.txt",
"pdist-minkowski-5.8-ml-iris.txt",
"pdist-seuclidean-ml-iris.txt",
"pdist-seuclidean-ml.txt",
"pdist-spearman-ml.txt",
"random-bool-data.txt",
"random-double-data.txt",
"random-int-data.txt",
"random-uint-data.txt",
]
_tdist = np.array([[0, 662, 877, 255, 412, 996],
[662, 0, 295, 468, 268, 400],
[877, 295, 0, 754, 564, 138],
[255, 468, 754, 0, 219, 869],
[412, 268, 564, 219, 0, 669],
[996, 400, 138, 869, 669, 0]], dtype='double')
_ytdist = squareform(_tdist)
# A hashmap of expected output arrays for the tests. These arrays
# come from a list of text files, which are read prior to testing.
# Each test loads inputs and outputs from this dictionary.
eo = {}
def load_testing_files():
for fn in _filenames:
name = fn.replace(".txt", "").replace("-ml", "")
fqfn = os.path.join(os.path.dirname(__file__), 'data', fn)
fp = open(fqfn)
eo[name] = np.loadtxt(fp)
fp.close()
eo['pdist-boolean-inp'] = np.bool_(eo['pdist-boolean-inp'])
eo['random-bool-data'] = np.bool_(eo['random-bool-data'])
eo['random-float32-data'] = np.float32(eo['random-double-data'])
eo['random-int-data'] = np.int_(eo['random-int-data'])
eo['random-uint-data'] = np.uint(eo['random-uint-data'])
load_testing_files()
def _chk_asarrays(arrays, axis=None):
arrays = [np.asanyarray(a) for a in arrays]
if axis is None:
# np < 1.10 ravel removes subclass from arrays
arrays = [np.ravel(a) if a.ndim != 1 else a
for a in arrays]
axis = 0
arrays = tuple(np.atleast_1d(a) for a in arrays)
if axis < 0:
if not all(a.ndim == arrays[0].ndim for a in arrays):
raise ValueError("array ndim must be the same for neg axis")
axis = range(arrays[0].ndim)[axis]
return arrays + (axis,)
def _chk_weights(arrays, weights=None, axis=None,
force_weights=False, simplify_weights=True,
pos_only=False, neg_check=False,
nan_screen=False, mask_screen=False,
ddof=None):
chked = _chk_asarrays(arrays, axis=axis)
arrays, axis = chked[:-1], chked[-1]
simplify_weights = simplify_weights and not force_weights
if not force_weights and mask_screen:
force_weights = any(np.ma.getmask(a) is not np.ma.nomask for a in arrays)
if nan_screen:
has_nans = [np.isnan(np.sum(a)) for a in arrays]
if any(has_nans):
mask_screen = True
force_weights = True
arrays = tuple(np.ma.masked_invalid(a) if has_nan else a
for a, has_nan in zip(arrays, has_nans))
if weights is not None:
weights = np.asanyarray(weights)
elif force_weights:
weights = np.ones(arrays[0].shape[axis])
else:
return arrays + (weights, axis)
if ddof:
weights = _freq_weights(weights)
if mask_screen:
weights = _weight_masked(arrays, weights, axis)
if not all(weights.shape == (a.shape[axis],) for a in arrays):
raise ValueError("weights shape must match arrays along axis")
if neg_check and (weights < 0).any():
raise ValueError("weights cannot be negative")
if pos_only:
pos_weights = np.nonzero(weights > 0)[0]
if pos_weights.size < weights.size:
arrays = tuple(np.take(a, pos_weights, axis=axis) for a in arrays)
weights = weights[pos_weights]
if simplify_weights and (weights == 1).all():
weights = None
return arrays + (weights, axis)
def _freq_weights(weights):
if weights is None:
return weights
int_weights = weights.astype(int)
if (weights != int_weights).any():
raise ValueError("frequency (integer count-type) weights required %s" % weights)
return int_weights
def _weight_masked(arrays, weights, axis):
if axis is None:
axis = 0
weights = np.asanyarray(weights)
for a in arrays:
axis_mask = np.ma.getmask(a)
if axis_mask is np.ma.nomask:
continue
if a.ndim > 1:
not_axes = tuple(i for i in range(a.ndim) if i != axis)
axis_mask = axis_mask.any(axis=not_axes)
weights *= 1 - axis_mask.astype(int)
return weights
def within_tol(a, b, tol):
return np.abs(a - b).max() < tol
def _assert_within_tol(a, b, atol=0, rtol=0, verbose_=False):
if verbose_:
print(np.abs(a - b).max())
assert_allclose(a, b, rtol=rtol, atol=atol)
def _rand_split(arrays, weights, axis, split_per, seed=None):
# inverse operation for stats.collapse_weights
weights = np.array(weights, dtype=np.float64) # modified inplace; need a copy
seeded_rand = np.random.RandomState(seed)
def mytake(a, ix, axis):
record = np.asanyarray(np.take(a, ix, axis=axis))
return record.reshape([a.shape[i] if i != axis else 1
for i in range(a.ndim)])
n_obs = arrays[0].shape[axis]
assert all(a.shape[axis] == n_obs for a in arrays), "data must be aligned on sample axis"
for i in range(int(split_per) * n_obs):
split_ix = seeded_rand.randint(n_obs + i)
prev_w = weights[split_ix]
q = seeded_rand.rand()
weights[split_ix] = q * prev_w
weights = np.append(weights, (1. - q) * prev_w)
arrays = [np.append(a, mytake(a, split_ix, axis=axis),
axis=axis) for a in arrays]
return arrays, weights
def _rough_check(a, b, compare_assert=partial(assert_allclose, atol=1e-5),
key=lambda x: x, w=None):
check_a = key(a)
check_b = key(b)
try:
if np.array(check_a != check_b).any(): # try strict equality for string types
compare_assert(check_a, check_b)
except AttributeError: # masked array
compare_assert(check_a, check_b)
except (TypeError, ValueError): # nested data structure
for a_i, b_i in zip(check_a, check_b):
_rough_check(a_i, b_i, compare_assert=compare_assert)
# diff from test_stats:
# n_args=2, weight_arg='w', default_axis=None
# ma_safe = False, nan_safe = False
def _weight_checked(fn, n_args=2, default_axis=None, key=lambda x: x, weight_arg='w',
squeeze=True, silent=False,
ones_test=True, const_test=True, dup_test=True,
split_test=True, dud_test=True, ma_safe=False, ma_very_safe=False, nan_safe=False,
split_per=1.0, seed=0, compare_assert=partial(assert_allclose, atol=1e-5)):
"""runs fn on its arguments 2 or 3 ways, checks that the results are the same,
then returns the same thing it would have returned before"""
@wraps(fn)
def wrapped(*args, **kwargs):
result = fn(*args, **kwargs)
arrays = args[:n_args]
rest = args[n_args:]
weights = kwargs.get(weight_arg, None)
axis = kwargs.get('axis', default_axis)
chked = _chk_weights(arrays, weights=weights, axis=axis, force_weights=True, mask_screen=True)
arrays, weights, axis = chked[:-2], chked[-2], chked[-1]
if squeeze:
arrays = [np.atleast_1d(a.squeeze()) for a in arrays]
try:
# WEIGHTS CHECK 1: EQUAL WEIGHTED OBESERVATIONS
args = tuple(arrays) + rest
if ones_test:
kwargs[weight_arg] = weights
_rough_check(result, fn(*args, **kwargs), key=key)
if const_test:
kwargs[weight_arg] = weights * 101.0
_rough_check(result, fn(*args, **kwargs), key=key)
kwargs[weight_arg] = weights * 0.101
try:
_rough_check(result, fn(*args, **kwargs), key=key)
except Exception as e:
raise type(e)((e, arrays, weights))
# WEIGHTS CHECK 2: ADDL 0-WEIGHTED OBS
if dud_test:
# add randomly resampled rows, weighted at 0
dud_arrays, dud_weights = _rand_split(arrays, weights, axis, split_per=split_per, seed=seed)
dud_weights[:weights.size] = weights # not exactly 1 because of masked arrays
dud_weights[weights.size:] = 0
dud_args = tuple(dud_arrays) + rest
kwargs[weight_arg] = dud_weights
_rough_check(result, fn(*dud_args, **kwargs), key=key)
# increase the value of those 0-weighted rows
for a in dud_arrays:
indexer = [slice(None)] * a.ndim
indexer[axis] = slice(weights.size, None)
indexer = tuple(indexer)
a[indexer] = a[indexer] * 101
dud_args = tuple(dud_arrays) + rest
_rough_check(result, fn(*dud_args, **kwargs), key=key)
# set those 0-weighted rows to NaNs
for a in dud_arrays:
indexer = [slice(None)] * a.ndim
indexer[axis] = slice(weights.size, None)
indexer = tuple(indexer)
a[indexer] = a[indexer] * np.nan
if kwargs.get("nan_policy", None) == "omit" and nan_safe:
dud_args = tuple(dud_arrays) + rest
_rough_check(result, fn(*dud_args, **kwargs), key=key)
# mask out those nan values
if ma_safe:
dud_arrays = [np.ma.masked_invalid(a) for a in dud_arrays]
dud_args = tuple(dud_arrays) + rest
_rough_check(result, fn(*dud_args, **kwargs), key=key)
if ma_very_safe:
kwargs[weight_arg] = None
_rough_check(result, fn(*dud_args, **kwargs), key=key)
del dud_arrays, dud_args, dud_weights
# WEIGHTS CHECK 3: DUPLICATE DATA (DUMB SPLITTING)
if dup_test:
dup_arrays = [np.append(a, a, axis=axis) for a in arrays]
dup_weights = np.append(weights, weights) / 2.0
dup_args = tuple(dup_arrays) + rest
kwargs[weight_arg] = dup_weights
_rough_check(result, fn(*dup_args, **kwargs), key=key)
del dup_args, dup_arrays, dup_weights
Loading ...